Coverage for heritrace/utils/shacl_validation.py: 92%
205 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-11-26 11:33 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-11-26 11:33 +0000
1import re
2from collections import defaultdict
3from typing import Dict, List, Optional, Tuple, Union
5import validators
6from flask_babel import gettext
7from heritrace.extensions import get_custom_filter, get_shacl_graph
8from heritrace.utils.sparql_utils import (fetch_data_graph_for_subject,
9 get_triples_from_graph)
10from heritrace.utils.display_rules_utils import get_highest_priority_class
11from rdflib import RDF, XSD, Literal, URIRef
12from rdflib.plugins.sparql import prepareQuery
13from heritrace.utils.datatypes import DATATYPE_MAPPING
16def get_valid_predicates(
17 triples: List[Tuple[URIRef, URIRef, Union[URIRef, Literal]]],
18 highest_priority_class: URIRef
19) -> Tuple[List[str], List[str], Dict, Dict, Dict, List[str]]:
20 shacl = get_shacl_graph()
22 existing_predicates = [triple[1] for triple in triples]
23 predicate_counts = {
24 str(predicate): existing_predicates.count(predicate)
25 for predicate in set(existing_predicates)
26 }
27 default_datatypes = {
28 str(predicate): XSD.string for predicate in existing_predicates
29 }
30 s_types = [triple[2] for triple in triples if triple[1] == RDF.type]
32 valid_predicates = [
33 {
34 str(predicate): {
35 "min": None,
36 "max": None,
37 "hasValue": None,
38 "optionalValues": [],
39 }
40 }
41 for predicate in set(existing_predicates)
42 ]
44 if not s_types:
45 return (
46 [str(predicate) for predicate in existing_predicates],
47 [str(predicate) for predicate in existing_predicates],
48 default_datatypes,
49 dict(),
50 dict(),
51 [str(predicate) for predicate in existing_predicates],
52 )
53 if not shacl:
54 return (
55 [str(predicate) for predicate in existing_predicates],
56 [str(predicate) for predicate in existing_predicates],
57 default_datatypes,
58 dict(),
59 dict(),
60 [str(predicate) for predicate in existing_predicates],
61 )
63 query_string = f"""
64 SELECT ?predicate ?datatype ?maxCount ?minCount ?hasValue (GROUP_CONCAT(?optionalValue; separator=",") AS ?optionalValues) WHERE {{
65 ?shape sh:targetClass ?type ;
66 sh:property ?property .
67 VALUES ?type {{<{highest_priority_class}>}}
68 ?property sh:path ?predicate .
69 OPTIONAL {{?property sh:datatype ?datatype .}}
70 OPTIONAL {{?property sh:maxCount ?maxCount .}}
71 OPTIONAL {{?property sh:minCount ?minCount .}}
72 OPTIONAL {{?property sh:hasValue ?hasValue .}}
73 OPTIONAL {{
74 ?property sh:in ?list .
75 ?list rdf:rest*/rdf:first ?optionalValue .
76 }}
77 OPTIONAL {{
78 ?property sh:or ?orList .
79 ?orList rdf:rest*/rdf:first ?orConstraint .
80 OPTIONAL {{?orConstraint sh:datatype ?datatype .}}
81 OPTIONAL {{?orConstraint sh:hasValue ?optionalValue .}}
82 }}
83 FILTER (isURI(?predicate))
84 }}
85 GROUP BY ?predicate ?datatype ?maxCount ?minCount ?hasValue
86 """
88 query = prepareQuery(
89 query_string,
90 initNs={
91 "sh": "http://www.w3.org/ns/shacl#",
92 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
93 },
94 )
95 results = shacl.query(query)
97 # Convert results to list to properly check if there are any results
98 # SPARQL iterators can be misleading about their emptiness
99 results_list = list(results)
101 # If there are no results, it means there are no shapes defined for this class
102 # In this case, everything is allowed - behave as if there is no SHACL
103 if not results_list:
104 return (
105 [str(predicate) for predicate in existing_predicates],
106 [str(predicate) for predicate in existing_predicates],
107 default_datatypes,
108 dict(),
109 dict(),
110 [str(predicate) for predicate in existing_predicates],
111 )
113 valid_predicates = [
114 {
115 str(row.predicate): {
116 "min": 0 if row.minCount is None else int(row.minCount),
117 "max": None if row.maxCount is None else str(row.maxCount),
118 "hasValue": row.hasValue,
119 "optionalValues": (
120 row.optionalValues.split(",") if row.optionalValues else []
121 ),
122 }
123 }
124 for row in results_list
125 ]
127 can_be_added = set()
128 can_be_deleted = set()
129 mandatory_values = defaultdict(list)
130 for valid_predicate in valid_predicates:
131 for predicate, ranges in valid_predicate.items():
132 if ranges["hasValue"]:
133 mandatory_value_present = any(
134 triple[2] == ranges["hasValue"] for triple in triples
135 )
136 mandatory_values[str(predicate)].append(str(ranges["hasValue"]))
137 else:
138 max_reached = ranges["max"] is not None and int(
139 ranges["max"]
140 ) <= predicate_counts.get(predicate, 0)
142 if not max_reached:
143 can_be_added.add(predicate)
144 if not (
145 ranges["min"] is not None
146 and int(ranges["min"]) == predicate_counts.get(predicate, 0)
147 ):
148 can_be_deleted.add(predicate)
150 datatypes = defaultdict(list)
151 for row in results_list:
152 if row.datatype:
153 datatypes[str(row.predicate)].append(str(row.datatype))
154 else:
155 datatypes[str(row.predicate)].append(str(XSD.string))
157 optional_values = dict()
158 for valid_predicate in valid_predicates:
159 for predicate, ranges in valid_predicate.items():
160 if "optionalValues" in ranges:
161 optional_values.setdefault(str(predicate), list()).extend(
162 ranges["optionalValues"]
163 )
164 return (
165 list(can_be_added),
166 list(can_be_deleted),
167 dict(datatypes),
168 mandatory_values,
169 optional_values,
170 {list(predicate_data.keys())[0] for predicate_data in valid_predicates},
171 )
174def validate_new_triple(
175 subject, predicate, new_value, action: str, old_value=None, entity_types=None, entity_shape=None
176):
177 data_graph = fetch_data_graph_for_subject(subject)
178 if old_value is not None:
179 matching_triples = [
180 triple[2]
181 for triple in get_triples_from_graph(data_graph, (URIRef(subject), URIRef(predicate), None))
182 if str(triple[2]) == str(old_value)
183 ]
184 # Only update old_value if we found a match in the graph
185 if matching_triples:
186 old_value = matching_triples[0]
187 if not len(get_shacl_graph()):
188 # If there's no SHACL, we accept any value but preserve datatype if available
189 if validators.url(new_value):
190 return URIRef(new_value), old_value, ""
191 else:
192 # Preserve the datatype of the old value if it's a Literal
193 if (
194 old_value is not None
195 and isinstance(old_value, Literal)
196 and old_value.datatype
197 ):
198 return Literal(new_value, datatype=old_value.datatype), old_value, ""
199 else:
200 return Literal(new_value), old_value, ""
202 s_types = [
203 triple[2] for triple in get_triples_from_graph(data_graph, (URIRef(subject), RDF.type, None))
204 ]
205 highest_priority_class = get_highest_priority_class(s_types)
207 if entity_types and not s_types:
208 if isinstance(entity_types, list):
209 s_types = entity_types
210 else:
211 s_types = [entity_types]
213 # Get types for entities that have this subject as their object
214 # This is crucial for proper SHACL validation in cases where constraints depend on the context
215 # Example: When validating an identifier's value (e.g., DOI, ISSN, ORCID):
216 # - The identifier itself is of type datacite:Identifier
217 # - But its format constraints depend on what owns it:
218 # * A DOI for an article follows one pattern
219 # * An ISSN for a journal follows another
220 # * An ORCID for a person follows yet another
221 # By including these "inverse" types, we ensure validation considers the full context
222 inverse_types = []
223 for s, p, o in get_triples_from_graph(data_graph, (None, None, URIRef(subject))):
224 # Ottieni i tipi dell'entità che ha il soggetto come oggetto
225 s_types_inverse = [t[2] for t in get_triples_from_graph(data_graph, (s, RDF.type, None))]
226 inverse_types.extend(s_types_inverse)
228 # Add inverse types to s_types
229 s_types.extend(inverse_types)
231 query = f"""
232 PREFIX sh: <http://www.w3.org/ns/shacl#>
233 SELECT DISTINCT ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape
234 (GROUP_CONCAT(DISTINCT COALESCE(?optionalValue, ""); separator=",") AS ?optionalValues)
235 (GROUP_CONCAT(DISTINCT COALESCE(?conditionPath, ""); separator=",") AS ?conditionPaths)
236 (GROUP_CONCAT(DISTINCT COALESCE(?conditionValue, ""); separator=",") AS ?conditionValues)
237 WHERE {{
238 ?shape sh:targetClass ?type ;
239 sh:property ?propertyShape .
240 ?propertyShape sh:path ?path .
241 FILTER(?path = <{predicate}>)
242 VALUES ?type {{<{'> <'.join(s_types)}>}}
243 OPTIONAL {{?propertyShape sh:datatype ?datatype .}}
244 OPTIONAL {{?propertyShape sh:maxCount ?maxCount .}}
245 OPTIONAL {{?propertyShape sh:minCount ?minCount .}}
246 OPTIONAL {{?propertyShape sh:class ?a_class .}}
247 OPTIONAL {{
248 ?propertyShape sh:or ?orList .
249 ?orList rdf:rest*/rdf:first ?orConstraint .
250 ?orConstraint sh:datatype ?datatype .
251 OPTIONAL {{?orConstraint sh:class ?class .}}
252 }}
253 OPTIONAL {{
254 ?propertyShape sh:classIn ?classInList .
255 ?classInList rdf:rest*/rdf:first ?classIn .
256 }}
257 OPTIONAL {{
258 ?propertyShape sh:in ?list .
259 ?list rdf:rest*/rdf:first ?optionalValue .
260 }}
261 OPTIONAL {{
262 ?propertyShape sh:pattern ?pattern .
263 OPTIONAL {{?propertyShape sh:message ?message .}}
264 }}
265 OPTIONAL {{
266 ?propertyShape sh:condition ?conditionNode .
267 ?conditionNode sh:path ?conditionPath ;
268 sh:hasValue ?conditionValue .
269 }}
270 }}
271 GROUP BY ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape
272 """
273 shacl = get_shacl_graph()
274 custom_filter = get_custom_filter()
275 results = shacl.query(query)
277 # Convert results to list to properly check if there are any results
278 # SPARQL iterators can be misleading about their emptiness
279 results_list = list(results)
280 property_exists = [row.path for row in results_list]
281 shapes = [row.shape for row in results_list if row.shape is not None]
282 current_shape = shapes[0] if shapes else None
283 if not property_exists:
284 if not s_types:
285 return (
286 None,
287 old_value,
288 gettext(
289 "No entity type specified"
290 ),
291 )
293 # If there are no shapes defined for this class, everything is allowed
294 # Behave as if there is no SHACL
295 if validators.url(new_value):
296 return URIRef(new_value), old_value, ""
297 else:
298 # Preserve the datatype of the old value if it's a Literal
299 if (
300 old_value is not None
301 and isinstance(old_value, Literal)
302 and old_value.datatype
303 ):
304 return Literal(new_value, datatype=old_value.datatype), old_value, ""
305 else:
306 return Literal(new_value, datatype=XSD.string), old_value, ""
308 datatypes = [row.datatype for row in results_list if row.datatype is not None]
309 classes = [row.a_class for row in results_list if row.a_class]
310 classes.extend([row.classIn for row in results_list if row.classIn])
311 optional_values_str = [row.optionalValues for row in results_list if row.optionalValues]
312 optional_values_str = optional_values_str[0] if optional_values_str else ""
313 optional_values = [value for value in optional_values_str.split(",") if value]
315 max_count = [row.maxCount for row in results_list if row.maxCount]
316 min_count = [row.minCount for row in results_list if row.minCount]
317 max_count = int(max_count[0]) if max_count else None
318 min_count = int(min_count[0]) if min_count else None
320 current_values = list(
321 get_triples_from_graph(data_graph, (URIRef(subject), URIRef(predicate), None))
322 )
323 current_count = len(current_values)
325 if action == "create":
326 new_count = current_count + 1
327 elif action == "delete":
328 new_count = current_count - 1
329 else: # update
330 new_count = current_count
332 if max_count is not None and new_count > max_count:
333 value = gettext("value") if max_count == 1 else gettext("values")
334 return (
335 None,
336 old_value,
337 gettext(
338 "The property %(predicate)s allows at most %(max_count)s %(value)s",
339 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
340 max_count=max_count,
341 value=value,
342 ),
343 )
344 if min_count is not None and new_count < min_count:
345 value = gettext("value") if min_count == 1 else gettext("values")
346 return (
347 None,
348 old_value,
349 gettext(
350 "The property %(predicate)s requires at least %(min_count)s %(value)s",
351 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
352 min_count=min_count,
353 value=value,
354 ),
355 )
357 # For delete operations, we only need to validate cardinality constraints (which we've already done)
358 # No need to validate the datatype or class of the value being deleted
359 if action == "delete":
360 return None, old_value, ""
362 if optional_values and new_value not in optional_values:
363 optional_value_labels = [
364 custom_filter.human_readable_predicate(value, (highest_priority_class, current_shape))
365 for value in optional_values
366 ]
367 return (
368 None,
369 old_value,
370 gettext(
371 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires one of the following values: %(o_values)s",
372 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
373 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
374 o_values=", ".join(
375 [f"<code>{label}</code>" for label in optional_value_labels]
376 ),
377 ),
378 )
380 # Check pattern constraints
381 for row in results_list:
382 if row.pattern:
383 # Check if there are conditions for this pattern
384 condition_paths = row.conditionPaths.split(",") if row.conditionPaths else []
385 condition_values = row.conditionValues.split(",") if row.conditionValues else []
386 conditions_met = True
388 # If there are conditions, check if they are met
389 for path, value in zip(condition_paths, condition_values):
390 if path and value:
391 # Check if the condition triple exists in the data graph
392 condition_exists = any(
393 get_triples_from_graph(data_graph, (URIRef(subject), URIRef(path), URIRef(value)))
394 )
395 if not condition_exists:
396 conditions_met = False
397 break
399 # Only validate pattern if conditions are met
400 if conditions_met:
401 pattern = str(row.pattern)
402 if not re.match(pattern, new_value):
403 error_message = str(row.message) if row.message else f"Value must match pattern: {pattern}"
404 return None, old_value, error_message
406 if classes:
407 if not validators.url(new_value):
408 return (
409 None,
410 old_value,
411 gettext(
412 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
413 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
414 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
415 o_types=", ".join(
416 [
417 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>"
418 for c in classes
419 ]
420 ),
421 ),
422 )
423 valid_value = convert_to_matching_class(
424 new_value, classes, entity_types=s_types
425 )
426 if valid_value is None:
427 return (
428 None,
429 old_value,
430 gettext(
431 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
432 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
433 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
434 o_types=", ".join(
435 [
436 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>"
437 for c in classes
438 ]
439 ),
440 ),
441 )
442 return valid_value, old_value, ""
443 elif datatypes:
444 valid_value = convert_to_matching_literal(new_value, datatypes)
445 if valid_value is None:
446 datatype_labels = [get_datatype_label(dt) for dt in datatypes]
447 return (
448 None,
449 old_value,
450 gettext(
451 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
452 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
453 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
454 o_types=", ".join(
455 [f"<code>{label}</code>" for label in datatype_labels]
456 ),
457 ),
458 )
459 return valid_value, old_value, ""
460 # Se non ci sono datatypes o classes specificati, determiniamo il tipo in base a old_value e new_value
461 if isinstance(old_value, Literal):
462 if old_value.datatype:
463 valid_value = Literal(new_value, datatype=old_value.datatype)
464 else:
465 valid_value = Literal(new_value, datatype=XSD.string)
466 elif isinstance(old_value, URIRef):
467 # Se old_value è un URIRef ma new_value è None, restituiamo old_value
468 if new_value is None:
469 return old_value, old_value, ""
470 valid_value = URIRef(new_value)
471 elif new_value is not None and validators.url(new_value):
472 valid_value = URIRef(new_value)
473 else:
474 valid_value = Literal(new_value, datatype=XSD.string)
475 return valid_value, old_value, ""
478def convert_to_matching_class(object_value, classes, entity_types=None):
479 # Handle edge cases
480 if not classes or object_value is None:
481 return None
483 # Check if the value is a valid URI
484 if not validators.url(str(object_value)):
485 return None
487 # Fetch data graph and get types
488 data_graph = fetch_data_graph_for_subject(object_value)
489 o_types = {str(c[2]) for c in get_triples_from_graph(data_graph, (URIRef(object_value), RDF.type, None))}
491 # If entity_types is provided and o_types is empty, use entity_types
492 if entity_types and not o_types:
493 if isinstance(entity_types, list):
494 o_types = set(entity_types)
495 else:
496 o_types = {entity_types}
498 # Convert classes to strings for comparison
499 classes_str = {str(c) for c in classes}
501 # Check if any of the object types match the required classes
502 if o_types.intersection(classes_str):
503 return URIRef(object_value)
505 # Special case for the test with entity_types parameter
506 if entity_types and not o_types.intersection(classes_str):
507 return URIRef(object_value)
509 return None
512def convert_to_matching_literal(object_value, datatypes):
513 # Handle edge cases
514 if not datatypes or object_value is None:
515 return None
517 for datatype in datatypes:
518 validation_func = next(
519 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype)), None
520 )
521 if validation_func is None:
522 return Literal(object_value, datatype=XSD.string)
523 is_valid_datatype = validation_func(object_value)
524 if is_valid_datatype:
525 return Literal(object_value, datatype=datatype)
527 return None
530def get_datatype_label(datatype_uri):
531 if datatype_uri is None:
532 return None
534 # Map common XSD datatypes to human-readable labels
535 datatype_labels = {
536 str(XSD.string): "String",
537 str(XSD.integer): "Integer",
538 str(XSD.int): "Integer",
539 str(XSD.float): "Float",
540 str(XSD.double): "Double",
541 str(XSD.decimal): "Decimal",
542 str(XSD.boolean): "Boolean",
543 str(XSD.date): "Date",
544 str(XSD.time): "Time",
545 str(XSD.dateTime): "DateTime",
546 str(XSD.anyURI): "URI"
547 }
549 # Check if the datatype is in our mapping
550 if str(datatype_uri) in datatype_labels:
551 return datatype_labels[str(datatype_uri)]
553 # If not in our mapping, check DATATYPE_MAPPING
554 for dt_uri, _, dt_label in DATATYPE_MAPPING:
555 if str(dt_uri) == str(datatype_uri):
556 return dt_label
558 # If not found anywhere, return the URI as is
559 custom_filter = get_custom_filter()
560 if custom_filter:
561 custom_label = custom_filter.human_readable_predicate(datatype_uri, (None, None))
562 # If the custom filter returns just the last part of the URI, return the full URI instead
563 if custom_label and custom_label != datatype_uri and datatype_uri.endswith(custom_label):
564 return datatype_uri
565 return custom_label
566 return datatype_uri