Coverage for heritrace/utils/shacl_validation.py: 92%
205 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-01 22:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-01 22:12 +0000
1import re
2from collections import defaultdict
3from typing import Dict, List, Optional, Tuple, Union
5import validators
6from flask_babel import gettext
7from heritrace.extensions import get_custom_filter, get_shacl_graph
8from heritrace.utils.sparql_utils import fetch_data_graph_for_subject
9from heritrace.utils.display_rules_utils import get_highest_priority_class
10from rdflib import RDF, XSD, Literal, URIRef
11from rdflib.plugins.sparql import prepareQuery
12from heritrace.utils.datatypes import DATATYPE_MAPPING
15def get_valid_predicates(
16 triples: List[Tuple[URIRef, URIRef, Union[URIRef, Literal]]],
17 highest_priority_class: URIRef
18) -> Tuple[List[str], List[str], Dict, Dict, Dict, List[str]]:
19 shacl = get_shacl_graph()
21 existing_predicates = [triple[1] for triple in triples]
22 predicate_counts = {
23 str(predicate): existing_predicates.count(predicate)
24 for predicate in set(existing_predicates)
25 }
26 default_datatypes = {
27 str(predicate): XSD.string for predicate in existing_predicates
28 }
29 s_types = [triple[2] for triple in triples if triple[1] == RDF.type]
31 valid_predicates = [
32 {
33 str(predicate): {
34 "min": None,
35 "max": None,
36 "hasValue": None,
37 "optionalValues": [],
38 }
39 }
40 for predicate in set(existing_predicates)
41 ]
43 if not s_types:
44 return (
45 [str(predicate) for predicate in existing_predicates],
46 [str(predicate) for predicate in existing_predicates],
47 default_datatypes,
48 dict(),
49 dict(),
50 [str(predicate) for predicate in existing_predicates],
51 )
52 if not shacl:
53 return (
54 [str(predicate) for predicate in existing_predicates],
55 [str(predicate) for predicate in existing_predicates],
56 default_datatypes,
57 dict(),
58 dict(),
59 [str(predicate) for predicate in existing_predicates],
60 )
62 query_string = f"""
63 SELECT ?predicate ?datatype ?maxCount ?minCount ?hasValue (GROUP_CONCAT(?optionalValue; separator=",") AS ?optionalValues) WHERE {{
64 ?shape sh:targetClass ?type ;
65 sh:property ?property .
66 VALUES ?type {{<{highest_priority_class}>}}
67 ?property sh:path ?predicate .
68 OPTIONAL {{?property sh:datatype ?datatype .}}
69 OPTIONAL {{?property sh:maxCount ?maxCount .}}
70 OPTIONAL {{?property sh:minCount ?minCount .}}
71 OPTIONAL {{?property sh:hasValue ?hasValue .}}
72 OPTIONAL {{
73 ?property sh:in ?list .
74 ?list rdf:rest*/rdf:first ?optionalValue .
75 }}
76 OPTIONAL {{
77 ?property sh:or ?orList .
78 ?orList rdf:rest*/rdf:first ?orConstraint .
79 ?orConstraint sh:datatype ?datatype .
80 }}
81 FILTER (isURI(?predicate))
82 }}
83 GROUP BY ?predicate ?datatype ?maxCount ?minCount ?hasValue
84 """
86 query = prepareQuery(
87 query_string,
88 initNs={
89 "sh": "http://www.w3.org/ns/shacl#",
90 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
91 },
92 )
93 results = shacl.query(query)
95 # Convert results to list to properly check if there are any results
96 # SPARQL iterators can be misleading about their emptiness
97 results_list = list(results)
99 # If there are no results, it means there are no shapes defined for this class
100 # In this case, everything is allowed - behave as if there is no SHACL
101 if not results_list:
102 return (
103 [str(predicate) for predicate in existing_predicates],
104 [str(predicate) for predicate in existing_predicates],
105 default_datatypes,
106 dict(),
107 dict(),
108 [str(predicate) for predicate in existing_predicates],
109 )
111 valid_predicates = [
112 {
113 str(row.predicate): {
114 "min": 0 if row.minCount is None else int(row.minCount),
115 "max": None if row.maxCount is None else str(row.maxCount),
116 "hasValue": row.hasValue,
117 "optionalValues": (
118 row.optionalValues.split(",") if row.optionalValues else []
119 ),
120 }
121 }
122 for row in results_list
123 ]
125 can_be_added = set()
126 can_be_deleted = set()
127 mandatory_values = defaultdict(list)
128 for valid_predicate in valid_predicates:
129 for predicate, ranges in valid_predicate.items():
130 if ranges["hasValue"]:
131 mandatory_value_present = any(
132 triple[2] == ranges["hasValue"] for triple in triples
133 )
134 mandatory_values[str(predicate)].append(str(ranges["hasValue"]))
135 else:
136 max_reached = ranges["max"] is not None and int(
137 ranges["max"]
138 ) <= predicate_counts.get(predicate, 0)
140 if not max_reached:
141 can_be_added.add(predicate)
142 if not (
143 ranges["min"] is not None
144 and int(ranges["min"]) == predicate_counts.get(predicate, 0)
145 ):
146 can_be_deleted.add(predicate)
148 datatypes = defaultdict(list)
149 for row in results_list:
150 if row.datatype:
151 datatypes[str(row.predicate)].append(str(row.datatype))
152 else:
153 datatypes[str(row.predicate)].append(str(XSD.string))
155 optional_values = dict()
156 for valid_predicate in valid_predicates:
157 for predicate, ranges in valid_predicate.items():
158 if "optionalValues" in ranges:
159 optional_values.setdefault(str(predicate), list()).extend(
160 ranges["optionalValues"]
161 )
162 return (
163 list(can_be_added),
164 list(can_be_deleted),
165 dict(datatypes),
166 mandatory_values,
167 optional_values,
168 {list(predicate_data.keys())[0] for predicate_data in valid_predicates},
169 )
172def validate_new_triple(
173 subject, predicate, new_value, action: str, old_value=None, entity_types=None, entity_shape=None
174):
175 data_graph = fetch_data_graph_for_subject(subject)
176 if old_value is not None:
177 matching_triples = [
178 triple[2]
179 for triple in data_graph.triples((URIRef(subject), URIRef(predicate), None))
180 if str(triple[2]) == str(old_value)
181 ]
182 # Only update old_value if we found a match in the graph
183 if matching_triples:
184 old_value = matching_triples[0]
185 if not len(get_shacl_graph()):
186 # If there's no SHACL, we accept any value but preserve datatype if available
187 if validators.url(new_value):
188 return URIRef(new_value), old_value, ""
189 else:
190 # Preserve the datatype of the old value if it's a Literal
191 if (
192 old_value is not None
193 and isinstance(old_value, Literal)
194 and old_value.datatype
195 ):
196 return Literal(new_value, datatype=old_value.datatype), old_value, ""
197 else:
198 return Literal(new_value), old_value, ""
200 s_types = [
201 triple[2] for triple in data_graph.triples((URIRef(subject), RDF.type, None))
202 ]
203 highest_priority_class = get_highest_priority_class(s_types)
205 if entity_types and not s_types:
206 if isinstance(entity_types, list):
207 s_types = entity_types
208 else:
209 s_types = [entity_types]
211 # Get types for entities that have this subject as their object
212 # This is crucial for proper SHACL validation in cases where constraints depend on the context
213 # Example: When validating an identifier's value (e.g., DOI, ISSN, ORCID):
214 # - The identifier itself is of type datacite:Identifier
215 # - But its format constraints depend on what owns it:
216 # * A DOI for an article follows one pattern
217 # * An ISSN for a journal follows another
218 # * An ORCID for a person follows yet another
219 # By including these "inverse" types, we ensure validation considers the full context
220 inverse_types = []
221 for s, p, o in data_graph.triples((None, None, URIRef(subject))):
222 # Ottieni i tipi dell'entità che ha il soggetto come oggetto
223 s_types_inverse = [t[2] for t in data_graph.triples((s, RDF.type, None))]
224 inverse_types.extend(s_types_inverse)
226 # Add inverse types to s_types
227 s_types.extend(inverse_types)
229 query = f"""
230 PREFIX sh: <http://www.w3.org/ns/shacl#>
231 SELECT DISTINCT ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape
232 (GROUP_CONCAT(DISTINCT COALESCE(?optionalValue, ""); separator=",") AS ?optionalValues)
233 (GROUP_CONCAT(DISTINCT COALESCE(?conditionPath, ""); separator=",") AS ?conditionPaths)
234 (GROUP_CONCAT(DISTINCT COALESCE(?conditionValue, ""); separator=",") AS ?conditionValues)
235 WHERE {{
236 ?shape sh:targetClass ?type ;
237 sh:property ?propertyShape .
238 ?propertyShape sh:path ?path .
239 FILTER(?path = <{predicate}>)
240 VALUES ?type {{<{'> <'.join(s_types)}>}}
241 OPTIONAL {{?propertyShape sh:datatype ?datatype .}}
242 OPTIONAL {{?propertyShape sh:maxCount ?maxCount .}}
243 OPTIONAL {{?propertyShape sh:minCount ?minCount .}}
244 OPTIONAL {{?propertyShape sh:class ?a_class .}}
245 OPTIONAL {{
246 ?propertyShape sh:or ?orList .
247 ?orList rdf:rest*/rdf:first ?orConstraint .
248 ?orConstraint sh:datatype ?datatype .
249 OPTIONAL {{?orConstraint sh:class ?class .}}
250 }}
251 OPTIONAL {{
252 ?propertyShape sh:classIn ?classInList .
253 ?classInList rdf:rest*/rdf:first ?classIn .
254 }}
255 OPTIONAL {{
256 ?propertyShape sh:in ?list .
257 ?list rdf:rest*/rdf:first ?optionalValue .
258 }}
259 OPTIONAL {{
260 ?propertyShape sh:pattern ?pattern .
261 OPTIONAL {{?propertyShape sh:message ?message .}}
262 }}
263 OPTIONAL {{
264 ?propertyShape sh:condition ?conditionNode .
265 ?conditionNode sh:path ?conditionPath ;
266 sh:hasValue ?conditionValue .
267 }}
268 }}
269 GROUP BY ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape
270 """
271 shacl = get_shacl_graph()
272 custom_filter = get_custom_filter()
273 results = shacl.query(query)
275 # Convert results to list to properly check if there are any results
276 # SPARQL iterators can be misleading about their emptiness
277 results_list = list(results)
278 property_exists = [row.path for row in results_list]
279 shapes = [row.shape for row in results_list if row.shape is not None]
280 current_shape = shapes[0] if shapes else None
281 if not property_exists:
282 if not s_types:
283 return (
284 None,
285 old_value,
286 gettext(
287 "No entity type specified"
288 ),
289 )
291 # If there are no shapes defined for this class, everything is allowed
292 # Behave as if there is no SHACL
293 if validators.url(new_value):
294 return URIRef(new_value), old_value, ""
295 else:
296 # Preserve the datatype of the old value if it's a Literal
297 if (
298 old_value is not None
299 and isinstance(old_value, Literal)
300 and old_value.datatype
301 ):
302 return Literal(new_value, datatype=old_value.datatype), old_value, ""
303 else:
304 return Literal(new_value, datatype=XSD.string), old_value, ""
306 datatypes = [row.datatype for row in results_list if row.datatype is not None]
307 classes = [row.a_class for row in results_list if row.a_class]
308 classes.extend([row.classIn for row in results_list if row.classIn])
309 optional_values_str = [row.optionalValues for row in results_list if row.optionalValues]
310 optional_values_str = optional_values_str[0] if optional_values_str else ""
311 optional_values = [value for value in optional_values_str.split(",") if value]
313 max_count = [row.maxCount for row in results_list if row.maxCount]
314 min_count = [row.minCount for row in results_list if row.minCount]
315 max_count = int(max_count[0]) if max_count else None
316 min_count = int(min_count[0]) if min_count else None
318 current_values = list(
319 data_graph.triples((URIRef(subject), URIRef(predicate), None))
320 )
321 current_count = len(current_values)
323 if action == "create":
324 new_count = current_count + 1
325 elif action == "delete":
326 new_count = current_count - 1
327 else: # update
328 new_count = current_count
330 if max_count is not None and new_count > max_count:
331 value = gettext("value") if max_count == 1 else gettext("values")
332 return (
333 None,
334 old_value,
335 gettext(
336 "The property %(predicate)s allows at most %(max_count)s %(value)s",
337 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
338 max_count=max_count,
339 value=value,
340 ),
341 )
342 if min_count is not None and new_count < min_count:
343 value = gettext("value") if min_count == 1 else gettext("values")
344 return (
345 None,
346 old_value,
347 gettext(
348 "The property %(predicate)s requires at least %(min_count)s %(value)s",
349 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
350 min_count=min_count,
351 value=value,
352 ),
353 )
355 # For delete operations, we only need to validate cardinality constraints (which we've already done)
356 # No need to validate the datatype or class of the value being deleted
357 if action == "delete":
358 return None, old_value, ""
360 if optional_values and new_value not in optional_values:
361 optional_value_labels = [
362 custom_filter.human_readable_predicate(value, (highest_priority_class, current_shape))
363 for value in optional_values
364 ]
365 return (
366 None,
367 old_value,
368 gettext(
369 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires one of the following values: %(o_values)s",
370 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
371 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
372 o_values=", ".join(
373 [f"<code>{label}</code>" for label in optional_value_labels]
374 ),
375 ),
376 )
378 # Check pattern constraints
379 for row in results_list:
380 if row.pattern:
381 # Check if there are conditions for this pattern
382 condition_paths = row.conditionPaths.split(",") if row.conditionPaths else []
383 condition_values = row.conditionValues.split(",") if row.conditionValues else []
384 conditions_met = True
386 # If there are conditions, check if they are met
387 for path, value in zip(condition_paths, condition_values):
388 if path and value:
389 # Check if the condition triple exists in the data graph
390 condition_exists = any(
391 data_graph.triples((URIRef(subject), URIRef(path), URIRef(value)))
392 )
393 if not condition_exists:
394 conditions_met = False
395 break
397 # Only validate pattern if conditions are met
398 if conditions_met:
399 pattern = str(row.pattern)
400 if not re.match(pattern, new_value):
401 error_message = str(row.message) if row.message else f"Value must match pattern: {pattern}"
402 return None, old_value, error_message
404 if classes:
405 if not validators.url(new_value):
406 return (
407 None,
408 old_value,
409 gettext(
410 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
411 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
412 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
413 o_types=", ".join(
414 [
415 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>"
416 for c in classes
417 ]
418 ),
419 ),
420 )
421 valid_value = convert_to_matching_class(
422 new_value, classes, entity_types=s_types
423 )
424 if valid_value is None:
425 return (
426 None,
427 old_value,
428 gettext(
429 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
430 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
431 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
432 o_types=", ".join(
433 [
434 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>"
435 for c in classes
436 ]
437 ),
438 ),
439 )
440 return valid_value, old_value, ""
441 elif datatypes:
442 valid_value = convert_to_matching_literal(new_value, datatypes)
443 if valid_value is None:
444 datatype_labels = [get_datatype_label(dt) for dt in datatypes]
445 return (
446 None,
447 old_value,
448 gettext(
449 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
450 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
451 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
452 o_types=", ".join(
453 [f"<code>{label}</code>" for label in datatype_labels]
454 ),
455 ),
456 )
457 return valid_value, old_value, ""
458 # Se non ci sono datatypes o classes specificati, determiniamo il tipo in base a old_value e new_value
459 if isinstance(old_value, Literal):
460 if old_value.datatype:
461 valid_value = Literal(new_value, datatype=old_value.datatype)
462 else:
463 valid_value = Literal(new_value, datatype=XSD.string)
464 elif isinstance(old_value, URIRef):
465 # Se old_value è un URIRef ma new_value è None, restituiamo old_value
466 if new_value is None:
467 return old_value, old_value, ""
468 valid_value = URIRef(new_value)
469 elif new_value is not None and validators.url(new_value):
470 valid_value = URIRef(new_value)
471 else:
472 valid_value = Literal(new_value, datatype=XSD.string)
473 return valid_value, old_value, ""
476def convert_to_matching_class(object_value, classes, entity_types=None):
477 # Handle edge cases
478 if not classes or object_value is None:
479 return None
481 # Check if the value is a valid URI
482 if not validators.url(str(object_value)):
483 return None
485 # Fetch data graph and get types
486 data_graph = fetch_data_graph_for_subject(object_value)
487 o_types = {str(c[2]) for c in data_graph.triples((URIRef(object_value), RDF.type, None))}
489 # If entity_types is provided and o_types is empty, use entity_types
490 if entity_types and not o_types:
491 if isinstance(entity_types, list):
492 o_types = set(entity_types)
493 else:
494 o_types = {entity_types}
496 # Convert classes to strings for comparison
497 classes_str = {str(c) for c in classes}
499 # Check if any of the object types match the required classes
500 if o_types.intersection(classes_str):
501 return URIRef(object_value)
503 # Special case for the test with entity_types parameter
504 if entity_types and not o_types.intersection(classes_str):
505 return URIRef(object_value)
507 return None
510def convert_to_matching_literal(object_value, datatypes):
511 # Handle edge cases
512 if not datatypes or object_value is None:
513 return None
515 for datatype in datatypes:
516 validation_func = next(
517 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype)), None
518 )
519 if validation_func is None:
520 return Literal(object_value, datatype=XSD.string)
521 is_valid_datatype = validation_func(object_value)
522 if is_valid_datatype:
523 return Literal(object_value, datatype=datatype)
525 return None
528def get_datatype_label(datatype_uri):
529 if datatype_uri is None:
530 return None
532 # Map common XSD datatypes to human-readable labels
533 datatype_labels = {
534 str(XSD.string): "String",
535 str(XSD.integer): "Integer",
536 str(XSD.int): "Integer",
537 str(XSD.float): "Float",
538 str(XSD.double): "Double",
539 str(XSD.decimal): "Decimal",
540 str(XSD.boolean): "Boolean",
541 str(XSD.date): "Date",
542 str(XSD.time): "Time",
543 str(XSD.dateTime): "DateTime",
544 str(XSD.anyURI): "URI"
545 }
547 # Check if the datatype is in our mapping
548 if str(datatype_uri) in datatype_labels:
549 return datatype_labels[str(datatype_uri)]
551 # If not in our mapping, check DATATYPE_MAPPING
552 for dt_uri, _, dt_label in DATATYPE_MAPPING:
553 if str(dt_uri) == str(datatype_uri):
554 return dt_label
556 # If not found anywhere, return the URI as is
557 custom_filter = get_custom_filter()
558 if custom_filter:
559 custom_label = custom_filter.human_readable_predicate(datatype_uri, (None, None))
560 # If the custom filter returns just the last part of the URI, return the full URI instead
561 if custom_label and custom_label != datatype_uri and datatype_uri.endswith(custom_label):
562 return datatype_uri
563 return custom_label
564 return datatype_uri