Coverage for heritrace/utils/shacl_validation.py: 92%
197 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-06-24 11:39 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-06-24 11:39 +0000
1import re
2from collections import defaultdict
3from typing import Dict, List, Optional, Tuple, Union
5import validators
6from flask_babel import gettext
7from heritrace.extensions import get_custom_filter, get_shacl_graph
8from heritrace.utils.sparql_utils import fetch_data_graph_for_subject
9from heritrace.utils.display_rules_utils import get_highest_priority_class
10from rdflib import RDF, XSD, Literal, URIRef
11from rdflib.plugins.sparql import prepareQuery
12from resources.datatypes import DATATYPE_MAPPING
15def get_valid_predicates(
16 triples: List[Tuple[URIRef, URIRef, Union[URIRef, Literal]]],
17 highest_priority_class: URIRef
18) -> Tuple[List[URIRef], List[URIRef], Dict, Dict, Dict, List[str]]:
19 shacl = get_shacl_graph()
21 existing_predicates = [triple[1] for triple in triples]
22 predicate_counts = {
23 str(predicate): existing_predicates.count(predicate)
24 for predicate in set(existing_predicates)
25 }
26 default_datatypes = {
27 str(predicate): XSD.string for predicate in existing_predicates
28 }
29 s_types = [triple[2] for triple in triples if triple[1] == RDF.type]
31 valid_predicates = [
32 {
33 str(predicate): {
34 "min": None,
35 "max": None,
36 "hasValue": None,
37 "optionalValues": [],
38 }
39 }
40 for predicate in set(existing_predicates)
41 ]
42 if not s_types:
43 return (
44 existing_predicates,
45 existing_predicates,
46 default_datatypes,
47 dict(),
48 dict(),
49 [str(predicate) for predicate in existing_predicates],
50 )
51 if not shacl:
52 return (
53 existing_predicates,
54 existing_predicates,
55 default_datatypes,
56 dict(),
57 dict(),
58 [str(predicate) for predicate in existing_predicates],
59 )
61 query_string = f"""
62 SELECT ?predicate ?datatype ?maxCount ?minCount ?hasValue (GROUP_CONCAT(?optionalValue; separator=",") AS ?optionalValues) WHERE {{
63 ?shape sh:targetClass ?type ;
64 sh:property ?property .
65 VALUES ?type {{<{highest_priority_class}>}}
66 ?property sh:path ?predicate .
67 OPTIONAL {{?property sh:datatype ?datatype .}}
68 OPTIONAL {{?property sh:maxCount ?maxCount .}}
69 OPTIONAL {{?property sh:minCount ?minCount .}}
70 OPTIONAL {{?property sh:hasValue ?hasValue .}}
71 OPTIONAL {{
72 ?property sh:in ?list .
73 ?list rdf:rest*/rdf:first ?optionalValue .
74 }}
75 OPTIONAL {{
76 ?property sh:or ?orList .
77 ?orList rdf:rest*/rdf:first ?orConstraint .
78 ?orConstraint sh:datatype ?datatype .
79 }}
80 FILTER (isURI(?predicate))
81 }}
82 GROUP BY ?predicate ?datatype ?maxCount ?minCount ?hasValue
83 """
85 query = prepareQuery(
86 query_string,
87 initNs={
88 "sh": "http://www.w3.org/ns/shacl#",
89 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
90 },
91 )
92 results = shacl.query(query)
93 valid_predicates = [
94 {
95 str(row.predicate): {
96 "min": 0 if row.minCount is None else int(row.minCount),
97 "max": None if row.maxCount is None else str(row.maxCount),
98 "hasValue": row.hasValue,
99 "optionalValues": (
100 row.optionalValues.split(",") if row.optionalValues else []
101 ),
102 }
103 }
104 for row in results
105 ]
107 can_be_added = set()
108 can_be_deleted = set()
109 mandatory_values = defaultdict(list)
110 for valid_predicate in valid_predicates:
111 for predicate, ranges in valid_predicate.items():
112 if ranges["hasValue"]:
113 mandatory_value_present = any(
114 triple[2] == ranges["hasValue"] for triple in triples
115 )
116 mandatory_values[str(predicate)].append(str(ranges["hasValue"]))
117 else:
118 max_reached = ranges["max"] is not None and int(
119 ranges["max"]
120 ) <= predicate_counts.get(predicate, 0)
122 if not max_reached:
123 can_be_added.add(predicate)
124 if not (
125 ranges["min"] is not None
126 and int(ranges["min"]) == predicate_counts.get(predicate, 0)
127 ):
128 can_be_deleted.add(predicate)
130 datatypes = defaultdict(list)
131 for row in results:
132 if row.datatype:
133 datatypes[str(row.predicate)].append(str(row.datatype))
134 else:
135 datatypes[str(row.predicate)].append(str(XSD.string))
137 optional_values = dict()
138 for valid_predicate in valid_predicates:
139 for predicate, ranges in valid_predicate.items():
140 if "optionalValues" in ranges:
141 optional_values.setdefault(str(predicate), list()).extend(
142 ranges["optionalValues"]
143 )
144 return (
145 list(can_be_added),
146 list(can_be_deleted),
147 dict(datatypes),
148 mandatory_values,
149 optional_values,
150 {list(predicate_data.keys())[0] for predicate_data in valid_predicates},
151 )
154def validate_new_triple(
155 subject, predicate, new_value, action: str, old_value=None, entity_types=None
156):
157 data_graph = fetch_data_graph_for_subject(subject)
158 if old_value is not None:
159 matching_triples = [
160 triple[2]
161 for triple in data_graph.triples((URIRef(subject), URIRef(predicate), None))
162 if str(triple[2]) == str(old_value)
163 ]
164 # Only update old_value if we found a match in the graph
165 if matching_triples:
166 old_value = matching_triples[0]
167 if not len(get_shacl_graph()):
168 # If there's no SHACL, we accept any value but preserve datatype if available
169 if validators.url(new_value):
170 return URIRef(new_value), old_value, ""
171 else:
172 # Preserve the datatype of the old value if it's a Literal
173 if (
174 old_value is not None
175 and isinstance(old_value, Literal)
176 and old_value.datatype
177 ):
178 return Literal(new_value, datatype=old_value.datatype), old_value, ""
179 else:
180 return Literal(new_value), old_value, ""
182 s_types = [
183 triple[2] for triple in data_graph.triples((URIRef(subject), RDF.type, None))
184 ]
185 highest_priority_class = get_highest_priority_class(s_types)
187 if entity_types and not s_types:
188 if isinstance(entity_types, list):
189 s_types = entity_types
190 else:
191 s_types = [entity_types]
193 # Get types for entities that have this subject as their object
194 # This is crucial for proper SHACL validation in cases where constraints depend on the context
195 # Example: When validating an identifier's value (e.g., DOI, ISSN, ORCID):
196 # - The identifier itself is of type datacite:Identifier
197 # - But its format constraints depend on what owns it:
198 # * A DOI for an article follows one pattern
199 # * An ISSN for a journal follows another
200 # * An ORCID for a person follows yet another
201 # By including these "inverse" types, we ensure validation considers the full context
202 inverse_types = []
203 for s, p, o in data_graph.triples((None, None, URIRef(subject))):
204 # Ottieni i tipi dell'entità che ha il soggetto come oggetto
205 s_types_inverse = [t[2] for t in data_graph.triples((s, RDF.type, None))]
206 inverse_types.extend(s_types_inverse)
208 # Add inverse types to s_types
209 s_types.extend(inverse_types)
211 query = f"""
212 PREFIX sh: <http://www.w3.org/ns/shacl#>
213 SELECT DISTINCT ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape
214 (GROUP_CONCAT(DISTINCT COALESCE(?optionalValue, ""); separator=",") AS ?optionalValues)
215 (GROUP_CONCAT(DISTINCT COALESCE(?conditionPath, ""); separator=",") AS ?conditionPaths)
216 (GROUP_CONCAT(DISTINCT COALESCE(?conditionValue, ""); separator=",") AS ?conditionValues)
217 WHERE {{
218 ?shape sh:targetClass ?type ;
219 sh:property ?propertyShape .
220 ?propertyShape sh:path ?path .
221 FILTER(?path = <{predicate}>)
222 VALUES ?type {{<{'> <'.join(s_types)}>}}
223 OPTIONAL {{?propertyShape sh:datatype ?datatype .}}
224 OPTIONAL {{?propertyShape sh:maxCount ?maxCount .}}
225 OPTIONAL {{?propertyShape sh:minCount ?minCount .}}
226 OPTIONAL {{?propertyShape sh:class ?a_class .}}
227 OPTIONAL {{
228 ?propertyShape sh:or ?orList .
229 ?orList rdf:rest*/rdf:first ?orConstraint .
230 ?orConstraint sh:datatype ?datatype .
231 OPTIONAL {{?orConstraint sh:class ?class .}}
232 }}
233 OPTIONAL {{
234 ?propertyShape sh:classIn ?classInList .
235 ?classInList rdf:rest*/rdf:first ?classIn .
236 }}
237 OPTIONAL {{
238 ?propertyShape sh:in ?list .
239 ?list rdf:rest*/rdf:first ?optionalValue .
240 }}
241 OPTIONAL {{
242 ?propertyShape sh:pattern ?pattern .
243 OPTIONAL {{?propertyShape sh:message ?message .}}
244 }}
245 OPTIONAL {{
246 ?propertyShape sh:condition ?conditionNode .
247 ?conditionNode sh:path ?conditionPath ;
248 sh:hasValue ?conditionValue .
249 }}
250 }}
251 GROUP BY ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape
252 """
253 shacl = get_shacl_graph()
254 custom_filter = get_custom_filter()
255 results = shacl.query(query)
256 property_exists = [row.path for row in results]
257 shapes = [row.shape for row in results if row.shape is not None]
258 current_shape = shapes[0] if shapes else None
259 if not property_exists:
260 if not s_types:
261 return (
262 None,
263 old_value,
264 gettext(
265 "No entity type specified"
266 ),
267 )
269 return (
270 None,
271 old_value,
272 gettext(
273 "The property %(predicate)s is not allowed for resources of type %(s_type)s",
274 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
275 s_type=custom_filter.human_readable_class((highest_priority_class, current_shape)),
276 ),
277 )
278 datatypes = [row.datatype for row in results if row.datatype is not None]
279 classes = [row.a_class for row in results if row.a_class]
280 classes.extend([row.classIn for row in results if row.classIn])
281 optional_values_str = [row.optionalValues for row in results if row.optionalValues]
282 optional_values_str = optional_values_str[0] if optional_values_str else ""
283 optional_values = [value for value in optional_values_str.split(",") if value]
285 max_count = [row.maxCount for row in results if row.maxCount]
286 min_count = [row.minCount for row in results if row.minCount]
287 max_count = int(max_count[0]) if max_count else None
288 min_count = int(min_count[0]) if min_count else None
290 current_values = list(
291 data_graph.triples((URIRef(subject), URIRef(predicate), None))
292 )
293 current_count = len(current_values)
295 if action == "create":
296 new_count = current_count + 1
297 elif action == "delete":
298 new_count = current_count - 1
299 else: # update
300 new_count = current_count
302 if max_count is not None and new_count > max_count:
303 value = gettext("value") if max_count == 1 else gettext("values")
304 return (
305 None,
306 old_value,
307 gettext(
308 "The property %(predicate)s allows at most %(max_count)s %(value)s",
309 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
310 max_count=max_count,
311 value=value,
312 ),
313 )
314 if min_count is not None and new_count < min_count:
315 value = gettext("value") if min_count == 1 else gettext("values")
316 return (
317 None,
318 old_value,
319 gettext(
320 "The property %(predicate)s requires at least %(min_count)s %(value)s",
321 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
322 min_count=min_count,
323 value=value,
324 ),
325 )
327 # For delete operations, we only need to validate cardinality constraints (which we've already done)
328 # No need to validate the datatype or class of the value being deleted
329 if action == "delete":
330 return None, old_value, ""
332 if optional_values and new_value not in optional_values:
333 optional_value_labels = [
334 custom_filter.human_readable_predicate(value, (highest_priority_class, current_shape))
335 for value in optional_values
336 ]
337 return (
338 None,
339 old_value,
340 gettext(
341 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires one of the following values: %(o_values)s",
342 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
343 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
344 o_values=", ".join(
345 [f"<code>{label}</code>" for label in optional_value_labels]
346 ),
347 ),
348 )
350 # Check pattern constraints
351 for row in results:
352 if row.pattern:
353 # Check if there are conditions for this pattern
354 condition_paths = row.conditionPaths.split(",") if row.conditionPaths else []
355 condition_values = row.conditionValues.split(",") if row.conditionValues else []
356 conditions_met = True
358 # If there are conditions, check if they are met
359 for path, value in zip(condition_paths, condition_values):
360 if path and value:
361 # Check if the condition triple exists in the data graph
362 condition_exists = any(
363 data_graph.triples((URIRef(subject), URIRef(path), URIRef(value)))
364 )
365 if not condition_exists:
366 conditions_met = False
367 break
369 # Only validate pattern if conditions are met
370 if conditions_met:
371 pattern = str(row.pattern)
372 if not re.match(pattern, new_value):
373 error_message = str(row.message) if row.message else f"Value must match pattern: {pattern}"
374 return None, old_value, error_message
376 if classes:
377 if not validators.url(new_value):
378 return (
379 None,
380 old_value,
381 gettext(
382 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
383 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
384 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
385 o_types=", ".join(
386 [
387 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>"
388 for c in classes
389 ]
390 ),
391 ),
392 )
393 valid_value = convert_to_matching_class(
394 new_value, classes, entity_types=s_types
395 )
396 if valid_value is None:
397 return (
398 None,
399 old_value,
400 gettext(
401 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
402 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
403 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
404 o_types=", ".join(
405 [
406 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>"
407 for c in classes
408 ]
409 ),
410 ),
411 )
412 return valid_value, old_value, ""
413 elif datatypes:
414 valid_value = convert_to_matching_literal(new_value, datatypes)
415 if valid_value is None:
416 datatype_labels = [get_datatype_label(dt) for dt in datatypes]
417 return (
418 None,
419 old_value,
420 gettext(
421 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
422 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)),
423 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)),
424 o_types=", ".join(
425 [f"<code>{label}</code>" for label in datatype_labels]
426 ),
427 ),
428 )
429 return valid_value, old_value, ""
430 # Se non ci sono datatypes o classes specificati, determiniamo il tipo in base a old_value e new_value
431 if isinstance(old_value, Literal):
432 if old_value.datatype:
433 valid_value = Literal(new_value, datatype=old_value.datatype)
434 else:
435 valid_value = Literal(new_value, datatype=XSD.string)
436 elif isinstance(old_value, URIRef):
437 # Se old_value è un URIRef ma new_value è None, restituiamo old_value
438 if new_value is None:
439 return old_value, old_value, ""
440 valid_value = URIRef(new_value)
441 elif new_value is not None and validators.url(new_value):
442 valid_value = URIRef(new_value)
443 else:
444 valid_value = Literal(new_value, datatype=XSD.string)
445 return valid_value, old_value, ""
448def convert_to_matching_class(object_value, classes, entity_types=None):
449 # Handle edge cases
450 if not classes or object_value is None:
451 return None
453 # Check if the value is a valid URI
454 if not validators.url(str(object_value)):
455 return None
457 # Fetch data graph and get types
458 data_graph = fetch_data_graph_for_subject(object_value)
459 o_types = {str(c[2]) for c in data_graph.triples((URIRef(object_value), RDF.type, None))}
461 # If entity_types is provided and o_types is empty, use entity_types
462 if entity_types and not o_types:
463 if isinstance(entity_types, list):
464 o_types = set(entity_types)
465 else:
466 o_types = {entity_types}
468 # Convert classes to strings for comparison
469 classes_str = {str(c) for c in classes}
471 # Check if any of the object types match the required classes
472 if o_types.intersection(classes_str):
473 return URIRef(object_value)
475 # Special case for the test with entity_types parameter
476 if entity_types and not o_types.intersection(classes_str):
477 return URIRef(object_value)
479 return None
482def convert_to_matching_literal(object_value, datatypes):
483 # Handle edge cases
484 if not datatypes or object_value is None:
485 return None
487 for datatype in datatypes:
488 validation_func = next(
489 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype)), None
490 )
491 if validation_func is None:
492 return Literal(object_value, datatype=XSD.string)
493 is_valid_datatype = validation_func(object_value)
494 if is_valid_datatype:
495 return Literal(object_value, datatype=datatype)
497 return None
500def get_datatype_label(datatype_uri):
501 if datatype_uri is None:
502 return None
504 # Map common XSD datatypes to human-readable labels
505 datatype_labels = {
506 str(XSD.string): "String",
507 str(XSD.integer): "Integer",
508 str(XSD.int): "Integer",
509 str(XSD.float): "Float",
510 str(XSD.double): "Double",
511 str(XSD.decimal): "Decimal",
512 str(XSD.boolean): "Boolean",
513 str(XSD.date): "Date",
514 str(XSD.time): "Time",
515 str(XSD.dateTime): "DateTime",
516 str(XSD.anyURI): "URI"
517 }
519 # Check if the datatype is in our mapping
520 if str(datatype_uri) in datatype_labels:
521 return datatype_labels[str(datatype_uri)]
523 # If not in our mapping, check DATATYPE_MAPPING
524 for dt_uri, _, dt_label in DATATYPE_MAPPING:
525 if str(dt_uri) == str(datatype_uri):
526 return dt_label
528 # If not found anywhere, return the URI as is
529 custom_filter = get_custom_filter()
530 if custom_filter:
531 custom_label = custom_filter.human_readable_predicate(datatype_uri, (None, None))
532 # If the custom filter returns just the last part of the URI, return the full URI instead
533 if custom_label and custom_label != datatype_uri and datatype_uri.endswith(custom_label):
534 return datatype_uri
535 return custom_label
536 return datatype_uri