Coverage for heritrace / utils / shacl_validation.py: 92%
205 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import re
6from collections import defaultdict
7from typing import Dict, List, Optional, Tuple, Union
9import validators
10from flask_babel import gettext
11from heritrace.extensions import get_custom_filter, get_shacl_graph
12from heritrace.utils.sparql_utils import (fetch_data_graph_for_subject,
13 get_triples_from_graph)
14from heritrace.utils.display_rules_utils import get_highest_priority_class
15from rdflib import RDF, XSD, Literal, URIRef
16from rdflib.plugins.sparql import prepareQuery
17from heritrace.utils.datatypes import DATATYPE_MAPPING
20def get_valid_predicates(
21 triples: List[Tuple[URIRef, URIRef, Union[URIRef, Literal]]],
22 highest_priority_class: URIRef
23) -> Tuple[List[str], List[str], Dict, Dict, Dict, List[str]]:
24 shacl = get_shacl_graph()
26 existing_predicates = [triple[1] for triple in triples]
27 predicate_counts = {
28 str(predicate): existing_predicates.count(predicate)
29 for predicate in set(existing_predicates)
30 }
31 default_datatypes = {
32 str(predicate): XSD.string for predicate in existing_predicates
33 }
34 s_types = [triple[2] for triple in triples if triple[1] == RDF.type]
36 valid_predicates = [
37 {
38 str(predicate): {
39 "min": None,
40 "max": None,
41 "hasValue": None,
42 "optionalValues": [],
43 }
44 }
45 for predicate in set(existing_predicates)
46 ]
48 if not s_types:
49 return (
50 [str(predicate) for predicate in existing_predicates],
51 [str(predicate) for predicate in existing_predicates],
52 default_datatypes,
53 dict(),
54 dict(),
55 [str(predicate) for predicate in existing_predicates],
56 )
57 if not shacl:
58 return (
59 [str(predicate) for predicate in existing_predicates],
60 [str(predicate) for predicate in existing_predicates],
61 default_datatypes,
62 dict(),
63 dict(),
64 [str(predicate) for predicate in existing_predicates],
65 )
67 query_string = f"""
68 SELECT ?predicate ?datatype ?maxCount ?minCount ?hasValue (GROUP_CONCAT(?optionalValue; separator=",") AS ?optionalValues) WHERE {{
69 ?shape sh:targetClass ?type ;
70 sh:property ?property .
71 VALUES ?type {{<{highest_priority_class}>}}
72 ?property sh:path ?predicate .
73 OPTIONAL {{?property sh:datatype ?datatype .}}
74 OPTIONAL {{?property sh:maxCount ?maxCount .}}
75 OPTIONAL {{?property sh:minCount ?minCount .}}
76 OPTIONAL {{?property sh:hasValue ?hasValue .}}
77 OPTIONAL {{
78 ?property sh:in ?list .
79 ?list rdf:rest*/rdf:first ?optionalValue .
80 }}
81 OPTIONAL {{
82 ?property sh:or ?orList .
83 ?orList rdf:rest*/rdf:first ?orConstraint .
84 OPTIONAL {{?orConstraint sh:datatype ?datatype .}}
85 OPTIONAL {{?orConstraint sh:hasValue ?optionalValue .}}
86 }}
87 FILTER (isURI(?predicate))
88 }}
89 GROUP BY ?predicate ?datatype ?maxCount ?minCount ?hasValue
90 """
92 query = prepareQuery(
93 query_string,
94 initNs={
95 "sh": "http://www.w3.org/ns/shacl#",
96 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
97 },
98 )
99 results = shacl.query(query)
101 # Convert results to list to properly check if there are any results
102 # SPARQL iterators can be misleading about their emptiness
103 results_list = list(results)
105 # If there are no results, it means there are no shapes defined for this class
106 # In this case, everything is allowed - behave as if there is no SHACL
107 if not results_list:
108 return (
109 [str(predicate) for predicate in existing_predicates],
110 [str(predicate) for predicate in existing_predicates],
111 default_datatypes,
112 dict(),
113 dict(),
114 [str(predicate) for predicate in existing_predicates],
115 )
117 valid_predicates = [
118 {
119 str(row.predicate): {
120 "min": 0 if row.minCount is None else int(row.minCount),
121 "max": None if row.maxCount is None else str(row.maxCount),
122 "hasValue": row.hasValue,
123 "optionalValues": (
124 row.optionalValues.split(",") if row.optionalValues else []
125 ),
126 }
127 }
128 for row in results_list
129 ]
131 can_be_added = set()
132 can_be_deleted = set()
133 mandatory_values = defaultdict(list)
134 for valid_predicate in valid_predicates:
135 for predicate, ranges in valid_predicate.items():
136 if ranges["hasValue"]:
137 mandatory_value_present = any(
138 triple[2] == ranges["hasValue"] for triple in triples
139 )
140 mandatory_values[str(predicate)].append(str(ranges["hasValue"]))
141 else:
142 max_reached = ranges["max"] is not None and int(
143 ranges["max"]
144 ) <= predicate_counts.get(predicate, 0)
146 if not max_reached:
147 can_be_added.add(predicate)
148 if not (
149 ranges["min"] is not None
150 and int(ranges["min"]) == predicate_counts.get(predicate, 0)
151 ):
152 can_be_deleted.add(predicate)
154 datatypes = defaultdict(list)
155 for row in results_list:
156 if row.datatype:
157 datatypes[str(row.predicate)].append(str(row.datatype))
158 else:
159 datatypes[str(row.predicate)].append(str(XSD.string))
161 optional_values = dict()
162 for valid_predicate in valid_predicates:
163 for predicate, ranges in valid_predicate.items():
164 if "optionalValues" in ranges:
165 optional_values.setdefault(str(predicate), list()).extend(
166 ranges["optionalValues"]
167 )
168 return (
169 list(can_be_added),
170 list(can_be_deleted),
171 dict(datatypes),
172 mandatory_values,
173 optional_values,
174 {list(predicate_data.keys())[0] for predicate_data in valid_predicates},
175 )
178def validate_new_triple(
179 subject, predicate, new_value, action: str, old_value=None, entity_types=None, entity_shape=None
180):
181 data_graph = fetch_data_graph_for_subject(subject)
182 if old_value is not None:
183 matching_triples = [
184 triple[2]
185 for triple in get_triples_from_graph(data_graph, (URIRef(subject), URIRef(predicate), None))
186 if str(triple[2]) == str(old_value)
187 ]
188 # Only update old_value if we found a match in the graph
189 if matching_triples:
190 old_value = matching_triples[0]
191 if not len(get_shacl_graph()):
192 # If there's no SHACL, we accept any value but preserve datatype if available
193 if validators.url(new_value):
194 return URIRef(new_value), old_value, ""
195 else:
196 # Preserve the datatype of the old value if it's a Literal
197 if (
198 old_value is not None
199 and isinstance(old_value, Literal)
200 and old_value.datatype
201 ):
202 return Literal(new_value, datatype=old_value.datatype), old_value, ""
203 else:
204 return Literal(new_value), old_value, ""
206 s_types = [
207 triple[2] for triple in get_triples_from_graph(data_graph, (URIRef(subject), RDF.type, None))
208 ]
209 highest_priority_class = get_highest_priority_class(s_types)
211 if entity_types and not s_types:
212 if isinstance(entity_types, list):
213 s_types = entity_types
214 else:
215 s_types = [entity_types]
217 # Get types for entities that have this subject as their object
218 # This is crucial for proper SHACL validation in cases where constraints depend on the context
219 # Example: When validating an identifier's value (e.g., DOI, ISSN, ORCID):
220 # - The identifier itself is of type datacite:Identifier
221 # - But its format constraints depend on what owns it:
222 # * A DOI for an article follows one pattern
223 # * An ISSN for a journal follows another
224 # * An ORCID for a person follows yet another
225 # By including these "inverse" types, we ensure validation considers the full context
226 inverse_types = []
227 for s, p, o in get_triples_from_graph(data_graph, (None, None, URIRef(subject))):
228 # Ottieni i tipi dell'entità che ha il soggetto come oggetto
229 s_types_inverse = [t[2] for t in get_triples_from_graph(data_graph, (s, RDF.type, None))]
230 inverse_types.extend(s_types_inverse)
232 # Add inverse types to s_types
233 s_types.extend(inverse_types)
235 query = f"""
236 PREFIX sh: <http://www.w3.org/ns/shacl#>
237 SELECT DISTINCT ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape
238 (GROUP_CONCAT(DISTINCT COALESCE(?optionalValue, ""); separator=",") AS ?optionalValues)
239 (GROUP_CONCAT(DISTINCT COALESCE(?conditionPath, ""); separator=",") AS ?conditionPaths)
240 (GROUP_CONCAT(DISTINCT COALESCE(?conditionValue, ""); separator=",") AS ?conditionValues)
241 WHERE {{
242 ?shape sh:targetClass ?type ;
243 sh:property ?propertyShape .
244 ?propertyShape sh:path ?path .
245 FILTER(?path = <{predicate}>)
246 VALUES ?type {{<{'> <'.join(s_types)}>}}
247 OPTIONAL {{?propertyShape sh:datatype ?datatype .}}
248 OPTIONAL {{?propertyShape sh:maxCount ?maxCount .}}
249 OPTIONAL {{?propertyShape sh:minCount ?minCount .}}
250 OPTIONAL {{?propertyShape sh:class ?a_class .}}
251 OPTIONAL {{
252 ?propertyShape sh:or ?orList .
253 ?orList rdf:rest*/rdf:first ?orConstraint .
254 ?orConstraint sh:datatype ?datatype .
255 OPTIONAL {{?orConstraint sh:class ?class .}}
256 }}
257 OPTIONAL {{
258 ?propertyShape sh:classIn ?classInList .
259 ?classInList rdf:rest*/rdf:first ?classIn .
260 }}
261 OPTIONAL {{
262 ?propertyShape sh:in ?list .
263 ?list rdf:rest*/rdf:first ?optionalValue .
264 }}
265 OPTIONAL {{
266 ?propertyShape sh:pattern ?pattern .
267 OPTIONAL {{?propertyShape sh:message ?message .}}
268 }}
269 OPTIONAL {{
270 ?propertyShape sh:condition ?conditionNode .
271 ?conditionNode sh:path ?conditionPath ;
272 sh:hasValue ?conditionValue .
273 }}
274 }}
275 GROUP BY ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape
276 """
277 shacl = get_shacl_graph()
278 custom_filter = get_custom_filter()
279 results = shacl.query(query)
281 # Convert results to list to properly check if there are any results
282 # SPARQL iterators can be misleading about their emptiness
283 results_list = list(results)
284 property_exists = [row.path for row in results_list]
285 shapes = [row.shape for row in results_list if row.shape is not None]
286 current_shape = shapes[0] if shapes else None
287 if not property_exists:
288 if not s_types:
289 return (
290 None,
291 old_value,
292 gettext(
293 "No entity type specified"
294 ),
295 )
297 # If there are no shapes defined for this class, everything is allowed
298 # Behave as if there is no SHACL
299 if validators.url(new_value):
300 return URIRef(new_value), old_value, ""
301 else:
302 # Preserve the datatype of the old value if it's a Literal
303 if (
304 old_value is not None
305 and isinstance(old_value, Literal)
306 and old_value.datatype
307 ):
308 return Literal(new_value, datatype=old_value.datatype), old_value, ""
309 else:
310 return Literal(new_value, datatype=XSD.string), old_value, ""
312 datatypes = [row.datatype for row in results_list if row.datatype is not None]
313 classes = [row.a_class for row in results_list if row.a_class]
314 classes.extend([row.classIn for row in results_list if row.classIn])
315 optional_values_str = [row.optionalValues for row in results_list if row.optionalValues]
316 optional_values_str = optional_values_str[0] if optional_values_str else ""
317 optional_values = [value for value in optional_values_str.split(",") if value]
319 max_count = [row.maxCount for row in results_list if row.maxCount]
320 min_count = [row.minCount for row in results_list if row.minCount]
321 max_count = int(max_count[0]) if max_count else None
322 min_count = int(min_count[0]) if min_count else None
324 current_values = list(
325 get_triples_from_graph(data_graph, (URIRef(subject), URIRef(predicate), None))
326 )
327 current_count = len(current_values)
329 if action == "create":
330 new_count = current_count + 1
331 elif action == "delete":
332 new_count = current_count - 1
333 else: # update
334 new_count = current_count
336 if max_count is not None and new_count > max_count:
337 value = gettext("value") if max_count == 1 else gettext("values")
338 return (
339 None,
340 old_value,
341 gettext(
342 "The property %(predicate)s allows at most %(max_count)s %(value)s",
343 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
344 max_count=max_count,
345 value=value,
346 ),
347 )
348 if min_count is not None and new_count < min_count:
349 value = gettext("value") if min_count == 1 else gettext("values")
350 return (
351 None,
352 old_value,
353 gettext(
354 "The property %(predicate)s requires at least %(min_count)s %(value)s",
355 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
356 min_count=min_count,
357 value=value,
358 ),
359 )
361 # For delete operations, we only need to validate cardinality constraints (which we've already done)
362 # No need to validate the datatype or class of the value being deleted
363 if action == "delete":
364 return None, old_value, ""
366 if optional_values and new_value not in optional_values:
367 optional_value_labels = [
368 custom_filter.human_readable_predicate(value, (highest_priority_class, current_shape))
369 for value in optional_values
370 ]
371 return (
372 None,
373 old_value,
374 gettext(
375 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires one of the following values: %(o_values)s",
376 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
377 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
378 o_values=", ".join(
379 [f"<code>{label}</code>" for label in optional_value_labels]
380 ),
381 ),
382 )
384 # Check pattern constraints
385 for row in results_list:
386 if row.pattern:
387 # Check if there are conditions for this pattern
388 condition_paths = row.conditionPaths.split(",") if row.conditionPaths else []
389 condition_values = row.conditionValues.split(",") if row.conditionValues else []
390 conditions_met = True
392 # If there are conditions, check if they are met
393 for path, value in zip(condition_paths, condition_values):
394 if path and value:
395 # Check if the condition triple exists in the data graph
396 condition_exists = any(
397 get_triples_from_graph(data_graph, (URIRef(subject), URIRef(path), URIRef(value)))
398 )
399 if not condition_exists:
400 conditions_met = False
401 break
403 # Only validate pattern if conditions are met
404 if conditions_met:
405 pattern = str(row.pattern)
406 if not re.match(pattern, new_value):
407 error_message = str(row.message) if row.message else f"Value must match pattern: {pattern}"
408 return None, old_value, error_message
410 if classes:
411 if not validators.url(new_value):
412 return (
413 None,
414 old_value,
415 gettext(
416 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
417 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
418 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
419 o_types=", ".join(
420 [
421 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>"
422 for c in classes
423 ]
424 ),
425 ),
426 )
427 valid_value = convert_to_matching_class(
428 new_value, classes, entity_types=s_types
429 )
430 if valid_value is None:
431 return (
432 None,
433 old_value,
434 gettext(
435 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
436 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
437 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
438 o_types=", ".join(
439 [
440 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>"
441 for c in classes
442 ]
443 ),
444 ),
445 )
446 return valid_value, old_value, ""
447 elif datatypes:
448 valid_value = convert_to_matching_literal(new_value, datatypes)
449 if valid_value is None:
450 datatype_labels = [get_datatype_label(dt) for dt in datatypes]
451 return (
452 None,
453 old_value,
454 gettext(
455 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
456 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
457 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
458 o_types=", ".join(
459 [f"<code>{label}</code>" for label in datatype_labels]
460 ),
461 ),
462 )
463 return valid_value, old_value, ""
464 # Se non ci sono datatypes o classes specificati, determiniamo il tipo in base a old_value e new_value
465 if isinstance(old_value, Literal):
466 if old_value.datatype:
467 valid_value = Literal(new_value, datatype=old_value.datatype)
468 else:
469 valid_value = Literal(new_value, datatype=XSD.string)
470 elif isinstance(old_value, URIRef):
471 # Se old_value è un URIRef ma new_value è None, restituiamo old_value
472 if new_value is None:
473 return old_value, old_value, ""
474 valid_value = URIRef(new_value)
475 elif new_value is not None and validators.url(new_value):
476 valid_value = URIRef(new_value)
477 else:
478 valid_value = Literal(new_value, datatype=XSD.string)
479 return valid_value, old_value, ""
482def convert_to_matching_class(object_value, classes, entity_types=None):
483 # Handle edge cases
484 if not classes or object_value is None:
485 return None
487 # Check if the value is a valid URI
488 if not validators.url(str(object_value)):
489 return None
491 # Fetch data graph and get types
492 data_graph = fetch_data_graph_for_subject(object_value)
493 o_types = {str(c[2]) for c in get_triples_from_graph(data_graph, (URIRef(object_value), RDF.type, None))}
495 # If entity_types is provided and o_types is empty, use entity_types
496 if entity_types and not o_types:
497 if isinstance(entity_types, list):
498 o_types = set(entity_types)
499 else:
500 o_types = {entity_types}
502 # Convert classes to strings for comparison
503 classes_str = {str(c) for c in classes}
505 # Check if any of the object types match the required classes
506 if o_types.intersection(classes_str):
507 return URIRef(object_value)
509 # Special case for the test with entity_types parameter
510 if entity_types and not o_types.intersection(classes_str):
511 return URIRef(object_value)
513 return None
516def convert_to_matching_literal(object_value, datatypes):
517 # Handle edge cases
518 if not datatypes or object_value is None:
519 return None
521 for datatype in datatypes:
522 validation_func = next(
523 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype)), None
524 )
525 if validation_func is None:
526 return Literal(object_value, datatype=XSD.string)
527 is_valid_datatype = validation_func(object_value)
528 if is_valid_datatype:
529 return Literal(object_value, datatype=datatype)
531 return None
534def get_datatype_label(datatype_uri):
535 if datatype_uri is None:
536 return None
538 # Map common XSD datatypes to human-readable labels
539 datatype_labels = {
540 str(XSD.string): "String",
541 str(XSD.integer): "Integer",
542 str(XSD.int): "Integer",
543 str(XSD.float): "Float",
544 str(XSD.double): "Double",
545 str(XSD.decimal): "Decimal",
546 str(XSD.boolean): "Boolean",
547 str(XSD.date): "Date",
548 str(XSD.time): "Time",
549 str(XSD.dateTime): "DateTime",
550 str(XSD.anyURI): "URI"
551 }
553 # Check if the datatype is in our mapping
554 if str(datatype_uri) in datatype_labels:
555 return datatype_labels[str(datatype_uri)]
557 # If not in our mapping, check DATATYPE_MAPPING
558 for dt_uri, _, dt_label in DATATYPE_MAPPING:
559 if str(dt_uri) == str(datatype_uri):
560 return dt_label
562 # If not found anywhere, return the URI as is
563 custom_filter = get_custom_filter()
564 if custom_filter:
565 custom_label = custom_filter.human_readable_predicate(datatype_uri, (None, None))
566 # If the custom filter returns just the last part of the URI, return the full URI instead
567 if custom_label and custom_label != datatype_uri and datatype_uri.endswith(custom_label):
568 return datatype_uri
569 return custom_label
570 return datatype_uri