Coverage for heritrace/utils/shacl_validation.py: 92%
205 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-10-13 17:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-10-13 17:12 +0000
1import re
2from collections import defaultdict
3from typing import Dict, List, Optional, Tuple, Union
5import validators
6from flask_babel import gettext
7from heritrace.extensions import get_custom_filter, get_shacl_graph
8from heritrace.utils.sparql_utils import fetch_data_graph_for_subject
9from heritrace.utils.display_rules_utils import get_highest_priority_class
10from rdflib import RDF, XSD, Literal, URIRef
11from rdflib.plugins.sparql import prepareQuery
12from heritrace.utils.datatypes import DATATYPE_MAPPING
15def get_valid_predicates(
16 triples: List[Tuple[URIRef, URIRef, Union[URIRef, Literal]]],
17 highest_priority_class: URIRef
18) -> Tuple[List[str], List[str], Dict, Dict, Dict, List[str]]:
19 shacl = get_shacl_graph()
21 existing_predicates = [triple[1] for triple in triples]
22 predicate_counts = {
23 str(predicate): existing_predicates.count(predicate)
24 for predicate in set(existing_predicates)
25 }
26 default_datatypes = {
27 str(predicate): XSD.string for predicate in existing_predicates
28 }
29 s_types = [triple[2] for triple in triples if triple[1] == RDF.type]
31 valid_predicates = [
32 {
33 str(predicate): {
34 "min": None,
35 "max": None,
36 "hasValue": None,
37 "optionalValues": [],
38 }
39 }
40 for predicate in set(existing_predicates)
41 ]
43 if not s_types:
44 return (
45 [str(predicate) for predicate in existing_predicates],
46 [str(predicate) for predicate in existing_predicates],
47 default_datatypes,
48 dict(),
49 dict(),
50 [str(predicate) for predicate in existing_predicates],
51 )
52 if not shacl:
53 return (
54 [str(predicate) for predicate in existing_predicates],
55 [str(predicate) for predicate in existing_predicates],
56 default_datatypes,
57 dict(),
58 dict(),
59 [str(predicate) for predicate in existing_predicates],
60 )
62 query_string = f"""
63 SELECT ?predicate ?datatype ?maxCount ?minCount ?hasValue (GROUP_CONCAT(?optionalValue; separator=",") AS ?optionalValues) WHERE {{
64 ?shape sh:targetClass ?type ;
65 sh:property ?property .
66 VALUES ?type {{<{highest_priority_class}>}}
67 ?property sh:path ?predicate .
68 OPTIONAL {{?property sh:datatype ?datatype .}}
69 OPTIONAL {{?property sh:maxCount ?maxCount .}}
70 OPTIONAL {{?property sh:minCount ?minCount .}}
71 OPTIONAL {{?property sh:hasValue ?hasValue .}}
72 OPTIONAL {{
73 ?property sh:in ?list .
74 ?list rdf:rest*/rdf:first ?optionalValue .
75 }}
76 OPTIONAL {{
77 ?property sh:or ?orList .
78 ?orList rdf:rest*/rdf:first ?orConstraint .
79 OPTIONAL {{?orConstraint sh:datatype ?datatype .}}
80 OPTIONAL {{?orConstraint sh:hasValue ?optionalValue .}}
81 }}
82 FILTER (isURI(?predicate))
83 }}
84 GROUP BY ?predicate ?datatype ?maxCount ?minCount ?hasValue
85 """
87 query = prepareQuery(
88 query_string,
89 initNs={
90 "sh": "http://www.w3.org/ns/shacl#",
91 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
92 },
93 )
94 results = shacl.query(query)
96 # Convert results to list to properly check if there are any results
97 # SPARQL iterators can be misleading about their emptiness
98 results_list = list(results)
100 # If there are no results, it means there are no shapes defined for this class
101 # In this case, everything is allowed - behave as if there is no SHACL
102 if not results_list:
103 return (
104 [str(predicate) for predicate in existing_predicates],
105 [str(predicate) for predicate in existing_predicates],
106 default_datatypes,
107 dict(),
108 dict(),
109 [str(predicate) for predicate in existing_predicates],
110 )
112 valid_predicates = [
113 {
114 str(row.predicate): {
115 "min": 0 if row.minCount is None else int(row.minCount),
116 "max": None if row.maxCount is None else str(row.maxCount),
117 "hasValue": row.hasValue,
118 "optionalValues": (
119 row.optionalValues.split(",") if row.optionalValues else []
120 ),
121 }
122 }
123 for row in results_list
124 ]
126 can_be_added = set()
127 can_be_deleted = set()
128 mandatory_values = defaultdict(list)
129 for valid_predicate in valid_predicates:
130 for predicate, ranges in valid_predicate.items():
131 if ranges["hasValue"]:
132 mandatory_value_present = any(
133 triple[2] == ranges["hasValue"] for triple in triples
134 )
135 mandatory_values[str(predicate)].append(str(ranges["hasValue"]))
136 else:
137 max_reached = ranges["max"] is not None and int(
138 ranges["max"]
139 ) <= predicate_counts.get(predicate, 0)
141 if not max_reached:
142 can_be_added.add(predicate)
143 if not (
144 ranges["min"] is not None
145 and int(ranges["min"]) == predicate_counts.get(predicate, 0)
146 ):
147 can_be_deleted.add(predicate)
149 datatypes = defaultdict(list)
150 for row in results_list:
151 if row.datatype:
152 datatypes[str(row.predicate)].append(str(row.datatype))
153 else:
154 datatypes[str(row.predicate)].append(str(XSD.string))
156 optional_values = dict()
157 for valid_predicate in valid_predicates:
158 for predicate, ranges in valid_predicate.items():
159 if "optionalValues" in ranges:
160 optional_values.setdefault(str(predicate), list()).extend(
161 ranges["optionalValues"]
162 )
163 return (
164 list(can_be_added),
165 list(can_be_deleted),
166 dict(datatypes),
167 mandatory_values,
168 optional_values,
169 {list(predicate_data.keys())[0] for predicate_data in valid_predicates},
170 )
173def validate_new_triple(
174 subject, predicate, new_value, action: str, old_value=None, entity_types=None, entity_shape=None
175):
176 data_graph = fetch_data_graph_for_subject(subject)
177 if old_value is not None:
178 matching_triples = [
179 triple[2]
180 for triple in data_graph.triples((URIRef(subject), URIRef(predicate), None))
181 if str(triple[2]) == str(old_value)
182 ]
183 # Only update old_value if we found a match in the graph
184 if matching_triples:
185 old_value = matching_triples[0]
186 if not len(get_shacl_graph()):
187 # If there's no SHACL, we accept any value but preserve datatype if available
188 if validators.url(new_value):
189 return URIRef(new_value), old_value, ""
190 else:
191 # Preserve the datatype of the old value if it's a Literal
192 if (
193 old_value is not None
194 and isinstance(old_value, Literal)
195 and old_value.datatype
196 ):
197 return Literal(new_value, datatype=old_value.datatype), old_value, ""
198 else:
199 return Literal(new_value), old_value, ""
201 s_types = [
202 triple[2] for triple in data_graph.triples((URIRef(subject), RDF.type, None))
203 ]
204 highest_priority_class = get_highest_priority_class(s_types)
206 if entity_types and not s_types:
207 if isinstance(entity_types, list):
208 s_types = entity_types
209 else:
210 s_types = [entity_types]
212 # Get types for entities that have this subject as their object
213 # This is crucial for proper SHACL validation in cases where constraints depend on the context
214 # Example: When validating an identifier's value (e.g., DOI, ISSN, ORCID):
215 # - The identifier itself is of type datacite:Identifier
216 # - But its format constraints depend on what owns it:
217 # * A DOI for an article follows one pattern
218 # * An ISSN for a journal follows another
219 # * An ORCID for a person follows yet another
220 # By including these "inverse" types, we ensure validation considers the full context
221 inverse_types = []
222 for s, p, o in data_graph.triples((None, None, URIRef(subject))):
223 # Ottieni i tipi dell'entità che ha il soggetto come oggetto
224 s_types_inverse = [t[2] for t in data_graph.triples((s, RDF.type, None))]
225 inverse_types.extend(s_types_inverse)
227 # Add inverse types to s_types
228 s_types.extend(inverse_types)
230 query = f"""
231 PREFIX sh: <http://www.w3.org/ns/shacl#>
232 SELECT DISTINCT ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape
233 (GROUP_CONCAT(DISTINCT COALESCE(?optionalValue, ""); separator=",") AS ?optionalValues)
234 (GROUP_CONCAT(DISTINCT COALESCE(?conditionPath, ""); separator=",") AS ?conditionPaths)
235 (GROUP_CONCAT(DISTINCT COALESCE(?conditionValue, ""); separator=",") AS ?conditionValues)
236 WHERE {{
237 ?shape sh:targetClass ?type ;
238 sh:property ?propertyShape .
239 ?propertyShape sh:path ?path .
240 FILTER(?path = <{predicate}>)
241 VALUES ?type {{<{'> <'.join(s_types)}>}}
242 OPTIONAL {{?propertyShape sh:datatype ?datatype .}}
243 OPTIONAL {{?propertyShape sh:maxCount ?maxCount .}}
244 OPTIONAL {{?propertyShape sh:minCount ?minCount .}}
245 OPTIONAL {{?propertyShape sh:class ?a_class .}}
246 OPTIONAL {{
247 ?propertyShape sh:or ?orList .
248 ?orList rdf:rest*/rdf:first ?orConstraint .
249 ?orConstraint sh:datatype ?datatype .
250 OPTIONAL {{?orConstraint sh:class ?class .}}
251 }}
252 OPTIONAL {{
253 ?propertyShape sh:classIn ?classInList .
254 ?classInList rdf:rest*/rdf:first ?classIn .
255 }}
256 OPTIONAL {{
257 ?propertyShape sh:in ?list .
258 ?list rdf:rest*/rdf:first ?optionalValue .
259 }}
260 OPTIONAL {{
261 ?propertyShape sh:pattern ?pattern .
262 OPTIONAL {{?propertyShape sh:message ?message .}}
263 }}
264 OPTIONAL {{
265 ?propertyShape sh:condition ?conditionNode .
266 ?conditionNode sh:path ?conditionPath ;
267 sh:hasValue ?conditionValue .
268 }}
269 }}
270 GROUP BY ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape
271 """
272 shacl = get_shacl_graph()
273 custom_filter = get_custom_filter()
274 results = shacl.query(query)
276 # Convert results to list to properly check if there are any results
277 # SPARQL iterators can be misleading about their emptiness
278 results_list = list(results)
279 property_exists = [row.path for row in results_list]
280 shapes = [row.shape for row in results_list if row.shape is not None]
281 current_shape = shapes[0] if shapes else None
282 if not property_exists:
283 if not s_types:
284 return (
285 None,
286 old_value,
287 gettext(
288 "No entity type specified"
289 ),
290 )
292 # If there are no shapes defined for this class, everything is allowed
293 # Behave as if there is no SHACL
294 if validators.url(new_value):
295 return URIRef(new_value), old_value, ""
296 else:
297 # Preserve the datatype of the old value if it's a Literal
298 if (
299 old_value is not None
300 and isinstance(old_value, Literal)
301 and old_value.datatype
302 ):
303 return Literal(new_value, datatype=old_value.datatype), old_value, ""
304 else:
305 return Literal(new_value, datatype=XSD.string), old_value, ""
307 datatypes = [row.datatype for row in results_list if row.datatype is not None]
308 classes = [row.a_class for row in results_list if row.a_class]
309 classes.extend([row.classIn for row in results_list if row.classIn])
310 optional_values_str = [row.optionalValues for row in results_list if row.optionalValues]
311 optional_values_str = optional_values_str[0] if optional_values_str else ""
312 optional_values = [value for value in optional_values_str.split(",") if value]
314 max_count = [row.maxCount for row in results_list if row.maxCount]
315 min_count = [row.minCount for row in results_list if row.minCount]
316 max_count = int(max_count[0]) if max_count else None
317 min_count = int(min_count[0]) if min_count else None
319 current_values = list(
320 data_graph.triples((URIRef(subject), URIRef(predicate), None))
321 )
322 current_count = len(current_values)
324 if action == "create":
325 new_count = current_count + 1
326 elif action == "delete":
327 new_count = current_count - 1
328 else: # update
329 new_count = current_count
331 if max_count is not None and new_count > max_count:
332 value = gettext("value") if max_count == 1 else gettext("values")
333 return (
334 None,
335 old_value,
336 gettext(
337 "The property %(predicate)s allows at most %(max_count)s %(value)s",
338 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
339 max_count=max_count,
340 value=value,
341 ),
342 )
343 if min_count is not None and new_count < min_count:
344 value = gettext("value") if min_count == 1 else gettext("values")
345 return (
346 None,
347 old_value,
348 gettext(
349 "The property %(predicate)s requires at least %(min_count)s %(value)s",
350 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
351 min_count=min_count,
352 value=value,
353 ),
354 )
356 # For delete operations, we only need to validate cardinality constraints (which we've already done)
357 # No need to validate the datatype or class of the value being deleted
358 if action == "delete":
359 return None, old_value, ""
361 if optional_values and new_value not in optional_values:
362 optional_value_labels = [
363 custom_filter.human_readable_predicate(value, (highest_priority_class, current_shape))
364 for value in optional_values
365 ]
366 return (
367 None,
368 old_value,
369 gettext(
370 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires one of the following values: %(o_values)s",
371 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
372 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
373 o_values=", ".join(
374 [f"<code>{label}</code>" for label in optional_value_labels]
375 ),
376 ),
377 )
379 # Check pattern constraints
380 for row in results_list:
381 if row.pattern:
382 # Check if there are conditions for this pattern
383 condition_paths = row.conditionPaths.split(",") if row.conditionPaths else []
384 condition_values = row.conditionValues.split(",") if row.conditionValues else []
385 conditions_met = True
387 # If there are conditions, check if they are met
388 for path, value in zip(condition_paths, condition_values):
389 if path and value:
390 # Check if the condition triple exists in the data graph
391 condition_exists = any(
392 data_graph.triples((URIRef(subject), URIRef(path), URIRef(value)))
393 )
394 if not condition_exists:
395 conditions_met = False
396 break
398 # Only validate pattern if conditions are met
399 if conditions_met:
400 pattern = str(row.pattern)
401 if not re.match(pattern, new_value):
402 error_message = str(row.message) if row.message else f"Value must match pattern: {pattern}"
403 return None, old_value, error_message
405 if classes:
406 if not validators.url(new_value):
407 return (
408 None,
409 old_value,
410 gettext(
411 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
412 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
413 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
414 o_types=", ".join(
415 [
416 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>"
417 for c in classes
418 ]
419 ),
420 ),
421 )
422 valid_value = convert_to_matching_class(
423 new_value, classes, entity_types=s_types
424 )
425 if valid_value is None:
426 return (
427 None,
428 old_value,
429 gettext(
430 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
431 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
432 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
433 o_types=", ".join(
434 [
435 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>"
436 for c in classes
437 ]
438 ),
439 ),
440 )
441 return valid_value, old_value, ""
442 elif datatypes:
443 valid_value = convert_to_matching_literal(new_value, datatypes)
444 if valid_value is None:
445 datatype_labels = [get_datatype_label(dt) for dt in datatypes]
446 return (
447 None,
448 old_value,
449 gettext(
450 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
451 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
452 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
453 o_types=", ".join(
454 [f"<code>{label}</code>" for label in datatype_labels]
455 ),
456 ),
457 )
458 return valid_value, old_value, ""
459 # Se non ci sono datatypes o classes specificati, determiniamo il tipo in base a old_value e new_value
460 if isinstance(old_value, Literal):
461 if old_value.datatype:
462 valid_value = Literal(new_value, datatype=old_value.datatype)
463 else:
464 valid_value = Literal(new_value, datatype=XSD.string)
465 elif isinstance(old_value, URIRef):
466 # Se old_value è un URIRef ma new_value è None, restituiamo old_value
467 if new_value is None:
468 return old_value, old_value, ""
469 valid_value = URIRef(new_value)
470 elif new_value is not None and validators.url(new_value):
471 valid_value = URIRef(new_value)
472 else:
473 valid_value = Literal(new_value, datatype=XSD.string)
474 return valid_value, old_value, ""
477def convert_to_matching_class(object_value, classes, entity_types=None):
478 # Handle edge cases
479 if not classes or object_value is None:
480 return None
482 # Check if the value is a valid URI
483 if not validators.url(str(object_value)):
484 return None
486 # Fetch data graph and get types
487 data_graph = fetch_data_graph_for_subject(object_value)
488 o_types = {str(c[2]) for c in data_graph.triples((URIRef(object_value), RDF.type, None))}
490 # If entity_types is provided and o_types is empty, use entity_types
491 if entity_types and not o_types:
492 if isinstance(entity_types, list):
493 o_types = set(entity_types)
494 else:
495 o_types = {entity_types}
497 # Convert classes to strings for comparison
498 classes_str = {str(c) for c in classes}
500 # Check if any of the object types match the required classes
501 if o_types.intersection(classes_str):
502 return URIRef(object_value)
504 # Special case for the test with entity_types parameter
505 if entity_types and not o_types.intersection(classes_str):
506 return URIRef(object_value)
508 return None
511def convert_to_matching_literal(object_value, datatypes):
512 # Handle edge cases
513 if not datatypes or object_value is None:
514 return None
516 for datatype in datatypes:
517 validation_func = next(
518 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype)), None
519 )
520 if validation_func is None:
521 return Literal(object_value, datatype=XSD.string)
522 is_valid_datatype = validation_func(object_value)
523 if is_valid_datatype:
524 return Literal(object_value, datatype=datatype)
526 return None
529def get_datatype_label(datatype_uri):
530 if datatype_uri is None:
531 return None
533 # Map common XSD datatypes to human-readable labels
534 datatype_labels = {
535 str(XSD.string): "String",
536 str(XSD.integer): "Integer",
537 str(XSD.int): "Integer",
538 str(XSD.float): "Float",
539 str(XSD.double): "Double",
540 str(XSD.decimal): "Decimal",
541 str(XSD.boolean): "Boolean",
542 str(XSD.date): "Date",
543 str(XSD.time): "Time",
544 str(XSD.dateTime): "DateTime",
545 str(XSD.anyURI): "URI"
546 }
548 # Check if the datatype is in our mapping
549 if str(datatype_uri) in datatype_labels:
550 return datatype_labels[str(datatype_uri)]
552 # If not in our mapping, check DATATYPE_MAPPING
553 for dt_uri, _, dt_label in DATATYPE_MAPPING:
554 if str(dt_uri) == str(datatype_uri):
555 return dt_label
557 # If not found anywhere, return the URI as is
558 custom_filter = get_custom_filter()
559 if custom_filter:
560 custom_label = custom_filter.human_readable_predicate(datatype_uri, (None, None))
561 # If the custom filter returns just the last part of the URI, return the full URI instead
562 if custom_label and custom_label != datatype_uri and datatype_uri.endswith(custom_label):
563 return datatype_uri
564 return custom_label
565 return datatype_uri