Coverage for heritrace/utils/shacl_utils.py: 94%
461 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
1import re
2from collections import OrderedDict, defaultdict
3from typing import List
5import validators
6from flask_babel import gettext
7from heritrace.extensions import get_custom_filter, get_shacl_graph
8from heritrace.utils.display_rules_utils import get_highest_priority_class
9from heritrace.utils.sparql_utils import fetch_data_graph_for_subject
10from rdflib import RDF, XSD, Graph, Literal, URIRef
11from rdflib.plugins.sparql import prepareQuery
12from resources.datatypes import DATATYPE_MAPPING
14COMMON_SPARQL_QUERY = prepareQuery(
15 """
16 SELECT ?shape ?type ?predicate ?nodeShape ?datatype ?maxCount ?minCount ?hasValue ?objectClass
17 ?conditionPath ?conditionValue ?pattern ?message
18 (GROUP_CONCAT(?optionalValue; separator=",") AS ?optionalValues)
19 (GROUP_CONCAT(?orNode; separator=",") AS ?orNodes)
20 WHERE {
21 ?shape sh:targetClass ?type ;
22 sh:property ?property .
23 ?property sh:path ?predicate .
24 OPTIONAL {
25 ?property sh:node ?nodeShape .
26 OPTIONAL {?nodeShape sh:targetClass ?objectClass .}
27 }
28 OPTIONAL {
29 ?property sh:or ?orList .
30 {
31 ?orList rdf:rest*/rdf:first ?orConstraint .
32 ?orConstraint sh:datatype ?datatype .
33 } UNION {
34 ?orList rdf:rest*/rdf:first ?orNodeShape .
35 ?orNodeShape sh:node ?orNode .
36 }
37 }
38 OPTIONAL { ?property sh:datatype ?datatype . }
39 OPTIONAL { ?property sh:maxCount ?maxCount . }
40 OPTIONAL { ?property sh:minCount ?minCount . }
41 OPTIONAL { ?property sh:hasValue ?hasValue . }
42 OPTIONAL {
43 ?property sh:in ?list .
44 ?list rdf:rest*/rdf:first ?optionalValue .
45 }
46 OPTIONAL {
47 ?property sh:condition ?conditionNode .
48 ?conditionNode sh:path ?conditionPath ;
49 sh:hasValue ?conditionValue .
50 }
51 OPTIONAL { ?property sh:pattern ?pattern . }
52 OPTIONAL { ?property sh:message ?message . }
53 FILTER (isURI(?predicate))
54 }
55 GROUP BY ?shape ?type ?predicate ?nodeShape ?datatype ?maxCount ?minCount ?hasValue
56 ?objectClass ?conditionPath ?conditionValue ?pattern ?message
57""",
58 initNs={
59 "sh": "http://www.w3.org/ns/shacl#",
60 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
61 },
62)
65def get_form_fields_from_shacl(shacl: Graph, display_rules: List[dict]):
66 """
67 Analizza le shape SHACL per estrarre i campi del form per ogni tipo di entità.
69 Restituisce:
70 OrderedDict: Un dizionario dove le chiavi sono i tipi di entità e i valori sono dizionari
71 dei campi del form con le loro proprietà.
72 """
73 if not shacl:
74 return dict()
76 # Step 1: Ottieni i campi iniziali dalle shape SHACL
77 form_fields = extract_shacl_form_fields(shacl, display_rules)
79 # Step 2: Processa le shape annidate per ogni campo
80 processed_shapes = set()
81 for entity_type in form_fields:
82 for predicate in form_fields[entity_type]:
83 for field_info in form_fields[entity_type][predicate]:
84 if field_info.get("nodeShape"):
85 field_info["nestedShape"] = process_nested_shapes(
86 shacl,
87 display_rules,
88 field_info["nodeShape"],
89 processed_shapes=processed_shapes,
90 )
92 # Step 3: Applica le regole di visualizzazione ai campi del form
93 if display_rules:
94 form_fields = apply_display_rules(shacl, form_fields, display_rules)
96 # Step 4: Ordina i campi del form secondo le regole di visualizzazione
97 ordered_form_fields = order_form_fields(form_fields, display_rules)
99 return ordered_form_fields
102def extract_shacl_form_fields(shacl, display_rules):
103 """
104 Estrae i campi del form dalle shape SHACL.
106 Restituisce:
107 defaultdict: Un dizionario dove le chiavi sono i tipi di entità e i valori sono dizionari
108 dei campi del form con le loro proprietà.
109 """
110 if not shacl:
111 return dict()
113 processed_shapes = set()
114 results = execute_shacl_query(shacl, COMMON_SPARQL_QUERY)
115 form_fields = process_query_results(
116 shacl, results, display_rules, processed_shapes, depth=0
117 )
118 return form_fields
121def execute_shacl_query(shacl: Graph, query, init_bindings=None):
122 """
123 Esegue una query SPARQL sul grafo SHACL con eventuali binding iniziali.
125 Argomenti:
126 shacl (Graph): Il grafo SHACL su cui eseguire la query.
127 query (PreparedQuery): La query SPARQL preparata.
128 init_bindings (dict): I binding iniziali per la query.
130 Restituisce:
131 Result: I risultati della query.
132 """
133 if init_bindings:
134 return shacl.query(query, initBindings=init_bindings)
135 else:
136 return shacl.query(query)
139def get_display_name_for_shape(entity_type, property_uri, shape_uri, display_rules):
140 """
141 Helper function to get displayName from display_rules by matching entity class,
142 property, and shape URI.
144 Args:
145 entity_type (str): The type of the current entity
146 property_uri (str): The URI of the property being processed
147 shape_uri (str): The URI of the shape to match
148 display_rules (list): The display rules configuration
150 Returns:
151 str: The display name if found, None otherwise
152 """
153 if display_rules:
154 for rule in display_rules:
155 # Match the entity class first
156 if rule.get("class") == entity_type:
157 # Then find the matching property
158 for prop in rule.get("displayProperties", []):
159 if prop.get("property") == property_uri:
160 # Finally match the shape in displayRules
161 for shape_rule in prop.get("displayRules", []):
162 if shape_rule.get("shape") == shape_uri:
163 return shape_rule.get("displayName")
164 return None
167def process_query_results(shacl, results, display_rules, processed_shapes, depth=0):
168 form_fields = defaultdict(dict)
169 for row in results:
170 subject_shape = str(row.shape)
171 entity_type = str(row.type)
172 predicate = str(row.predicate)
173 nodeShape = str(row.nodeShape) if row.nodeShape else None
174 hasValue = str(row.hasValue) if row.hasValue else None
175 objectClass = str(row.objectClass) if row.objectClass else None
176 minCount = 0 if row.minCount is None else int(row.minCount)
177 maxCount = None if row.maxCount is None else int(row.maxCount)
178 datatype = str(row.datatype) if row.datatype else None
179 optionalValues = [v for v in (row.optionalValues or "").split(",") if v]
180 orNodes = [v for v in (row.orNodes or "").split(",") if v]
182 condition_entry = {}
183 if row.conditionPath and row.conditionValue:
184 condition_entry["condition"] = {
185 "path": str(row.conditionPath),
186 "value": str(row.conditionValue),
187 }
188 if row.pattern:
189 condition_entry["pattern"] = str(row.pattern)
190 if row.message:
191 condition_entry["message"] = str(row.message)
193 if predicate not in form_fields[entity_type]:
194 form_fields[entity_type][predicate] = []
196 nodeShapes = []
197 if nodeShape:
198 nodeShapes.append(nodeShape)
199 nodeShapes.extend(orNodes)
201 existing_field = None
202 for field in form_fields[entity_type][predicate]:
203 if (
204 field.get("nodeShape") == nodeShape
205 and field.get("nodeShapes") == nodeShapes
206 and field.get("subjectShape") == subject_shape
207 and field.get("hasValue") == hasValue
208 and field.get("objectClass") == objectClass
209 and field.get("min") == minCount
210 and field.get("max") == maxCount
211 and field.get("optionalValues") == optionalValues
212 ):
213 existing_field = field
214 break
216 if existing_field:
217 if datatype and str(datatype) not in existing_field.get("datatypes", []):
218 existing_field.setdefault("datatypes", []).append(str(datatype))
219 if condition_entry:
220 existing_field.setdefault("conditions", []).append(condition_entry)
221 if orNodes:
222 existing_field.setdefault("or", [])
223 for node in orNodes:
224 entity_type_or_node = get_shape_target_class(shacl, node)
225 object_class = get_object_class(shacl, node, predicate)
226 shape_display_name = get_display_name_for_shape(
227 entity_type, predicate, node, display_rules
228 )
229 # Process orNode as a field_info
230 or_field_info = {
231 "entityType": entity_type_or_node,
232 "uri": predicate,
233 "displayName": shape_display_name,
234 "subjectShape": subject_shape,
235 "nodeShape": node,
236 "min": minCount,
237 "max": maxCount,
238 "hasValue": hasValue,
239 "objectClass": object_class,
240 "optionalValues": optionalValues,
241 "conditions": [condition_entry] if condition_entry else [],
242 }
243 if node not in processed_shapes:
244 or_field_info["nestedShape"] = process_nested_shapes(
245 shacl,
246 display_rules,
247 node,
248 depth=depth + 1,
249 processed_shapes=processed_shapes,
250 )
251 existing_field["or"].append(or_field_info)
252 else:
253 field_info = {
254 "entityType": entity_type,
255 "uri": predicate,
256 "nodeShape": nodeShape,
257 "nodeShapes": nodeShapes,
258 "subjectShape": subject_shape,
259 "datatypes": [datatype] if datatype else [],
260 "min": minCount,
261 "max": maxCount,
262 "hasValue": hasValue,
263 "objectClass": objectClass,
264 "optionalValues": optionalValues,
265 "conditions": [condition_entry] if condition_entry else [],
266 "inputType": determine_input_type(datatype),
267 }
269 if nodeShape and nodeShape not in processed_shapes:
270 field_info["nestedShape"] = process_nested_shapes(
271 shacl,
272 display_rules,
273 nodeShape,
274 depth=depth + 1,
275 processed_shapes=processed_shapes,
276 )
278 if orNodes:
279 field_info["or"] = []
280 for node in orNodes:
281 # Process orNode as a field_info
282 entity_type_or_node = get_shape_target_class(shacl, node)
283 object_class = get_object_class(shacl, node, predicate)
284 shape_display_name = get_display_name_for_shape(
285 entity_type, predicate, node, display_rules
286 )
287 or_field_info = {
288 "entityType": entity_type_or_node,
289 "uri": predicate,
290 "displayName": shape_display_name,
291 "subjectShape": subject_shape,
292 "nodeShape": node,
293 "min": minCount,
294 "max": maxCount,
295 "hasValue": hasValue,
296 "objectClass": objectClass,
297 "optionalValues": optionalValues,
298 "conditions": [condition_entry] if condition_entry else [],
299 }
300 if node not in processed_shapes:
301 or_field_info["nestedShape"] = process_nested_shapes(
302 shacl,
303 display_rules,
304 node,
305 depth=depth + 1,
306 processed_shapes=processed_shapes,
307 )
308 field_info["or"].append(or_field_info)
310 form_fields[entity_type][predicate].append(field_info)
312 return form_fields
315def get_shape_target_class(shacl, shape_uri):
316 query = prepareQuery(
317 """
318 SELECT ?targetClass
319 WHERE {
320 ?shape sh:targetClass ?targetClass .
321 }
322 """,
323 initNs={"sh": "http://www.w3.org/ns/shacl#"},
324 )
325 results = execute_shacl_query(shacl, query, {"shape": URIRef(shape_uri)})
326 for row in results:
327 return str(row.targetClass)
328 return None
331def get_object_class(shacl, shape_uri, predicate_uri):
332 query = prepareQuery(
333 """
334 SELECT DISTINCT ?targetClass
335 WHERE {
336 ?shape sh:property ?propertyShape .
337 ?propertyShape sh:path ?predicate .
338 {
339 # Caso 1: definizione diretta con sh:node
340 ?propertyShape sh:node ?nodeShape .
341 ?nodeShape sh:targetClass ?targetClass .
342 } UNION {
343 # Caso 2: definizione diretta con sh:class
344 ?propertyShape sh:class ?targetClass .
345 } UNION {
346 # Caso 3: definizione con sh:or che include node shapes
347 ?propertyShape sh:or ?orList .
348 ?orList rdf:rest*/rdf:first ?choice .
349 {
350 ?choice sh:node ?nodeShape .
351 ?nodeShape sh:targetClass ?targetClass .
352 } UNION {
353 ?choice sh:class ?targetClass .
354 }
355 }
356 }
357 """,
358 initNs={
359 "sh": "http://www.w3.org/ns/shacl#",
360 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
361 },
362 )
364 results = execute_shacl_query(
365 shacl, query, {"shape": URIRef(shape_uri), "predicate": URIRef(predicate_uri)}
366 )
368 # Prendiamo il primo risultato valido
369 for row in results:
370 if row.targetClass:
371 return str(row.targetClass)
372 return None
375def process_nested_shapes(
376 shacl, display_rules, shape_uri, depth=0, processed_shapes=None
377):
378 """
379 Processa ricorsivamente le shape annidate.
381 Argomenti:
382 shape_uri (str): L'URI della shape da processare.
383 depth (int): La profondità corrente della ricorsione.
384 processed_shapes (set): Un insieme delle shape già processate.
386 Restituisce:
387 list: Una lista di dizionari dei campi annidati.
388 """
389 if processed_shapes is None:
390 processed_shapes = set()
392 if shape_uri in processed_shapes:
393 return []
395 processed_shapes.add(shape_uri)
396 init_bindings = {"shape": URIRef(shape_uri)}
397 nested_results = execute_shacl_query(shacl, COMMON_SPARQL_QUERY, init_bindings)
398 nested_fields = []
400 temp_form_fields = process_query_results(
401 shacl, nested_results, display_rules, processed_shapes, depth
402 )
404 # Applica le regole di visualizzazione ai campi annidati
405 if display_rules:
406 temp_form_fields = apply_display_rules(shacl, temp_form_fields, display_rules)
407 temp_form_fields = order_form_fields(temp_form_fields, display_rules)
409 # Estrai i campi per il tipo di entità
410 for entity_type in temp_form_fields:
411 for predicate in temp_form_fields[entity_type]:
412 nested_fields.extend(temp_form_fields[entity_type][predicate])
414 processed_shapes.remove(shape_uri)
415 return nested_fields
418def get_property_order(entity_type, display_rules):
419 """
420 Recupera l'ordine delle proprietà per un tipo di entità dalle regole di visualizzazione.
422 Argomenti:
423 entity_type (str): L'URI del tipo di entità.
425 Restituisce:
426 list: Una lista di URI di proprietà nell'ordine desiderato.
427 """
428 if not display_rules:
429 return []
431 for rule in display_rules:
432 if rule.get("class") == entity_type and "propertyOrder" in rule:
433 return rule["propertyOrder"]
434 elif rule.get("class") == entity_type:
435 return [prop["property"] for prop in rule.get("displayProperties", [])]
436 return []
439def order_fields(fields, property_order):
440 """
441 Ordina i campi secondo l'ordine specificato delle proprietà.
443 Argomenti:
444 fields (list): Una lista di dizionari dei campi da ordinare.
445 property_order (list): Una lista di URI di proprietà nell'ordine desiderato.
447 Restituisce:
448 list: Una lista ordinata di dizionari dei campi.
449 """
450 if not fields:
451 return []
452 if not property_order:
453 return fields
455 # Create a dictionary to map predicates to their position in property_order
456 order_dict = {pred: i for i, pred in enumerate(property_order)}
458 # Sort fields based on their position in property_order
459 # Fields not in property_order will be placed at the end
460 return sorted(
461 fields,
462 key=lambda f: order_dict.get(f.get("predicate", f.get("uri", "")), float("inf")),
463 )
466def apply_display_rules(shacl, form_fields, display_rules):
467 """
468 Applica le regole di visualizzazione ai campi del form.
470 Argomenti:
471 form_fields (dict): I campi del form iniziali estratti dalle shape SHACL.
473 Restituisce:
474 dict: I campi del form dopo aver applicato le regole di visualizzazione.
475 """
476 for rule in display_rules:
477 entity_class = rule.get("class")
478 if entity_class and entity_class in form_fields:
479 for prop in rule.get("displayProperties", []):
480 prop_uri = prop["property"]
481 if prop_uri in form_fields[entity_class]:
482 for field_info in form_fields[entity_class][prop_uri]:
483 add_display_information(field_info, prop)
484 # Chiamata ricorsiva per le nestedShape
485 if "nestedShape" in field_info:
486 apply_display_rules_to_nested_shapes(
487 field_info["nestedShape"], prop, display_rules
488 )
489 if "or" in field_info:
490 for or_field in field_info["or"]:
491 apply_display_rules_to_nested_shapes(
492 [or_field], field_info, display_rules
493 )
494 if "intermediateRelation" in prop:
495 handle_intermediate_relation(shacl, field_info, prop)
496 if "displayRules" in prop:
497 handle_sub_display_rules(
498 shacl,
499 form_fields,
500 entity_class,
501 form_fields[entity_class][prop_uri],
502 prop,
503 )
504 return form_fields
507def apply_display_rules_to_nested_shapes(nested_fields, parent_prop, shape_uri):
508 """Apply display rules to nested shapes."""
509 if not nested_fields:
510 return []
512 # Handle case where parent_prop is not a dictionary
513 if not isinstance(parent_prop, dict):
514 return nested_fields
516 # Create a new list to avoid modifying the original
517 result_fields = []
518 for field in nested_fields:
519 # Create a copy of the field to avoid modifying the original
520 new_field = field.copy()
521 result_fields.append(new_field)
523 # Find the matching shape in the parent property's display rules
524 found_matching_shape = False
525 for rule in parent_prop.get("displayRules", []):
526 if rule.get("shape") == shape_uri and "nestedDisplayRules" in rule:
527 found_matching_shape = True
528 # Apply nested display rules to each field
529 for field in result_fields:
530 for nested_rule in rule["nestedDisplayRules"]:
531 # Check both predicate and uri keys to be more flexible
532 field_key = field.get("predicate", field.get("uri"))
533 if field_key == nested_rule["property"]:
534 # Apply display properties from the rule to the field
535 for key, value in nested_rule.items():
536 if key != "property":
537 field[key] = value
538 break
540 return result_fields
543def determine_input_type(datatype):
544 """
545 Determina il tipo di input appropriato basato sul datatype XSD.
546 """
547 if not datatype:
548 return "text"
550 datatype = str(datatype)
551 datatype_to_input = {
552 "http://www.w3.org/2001/XMLSchema#string": "text",
553 "http://www.w3.org/2001/XMLSchema#integer": "number",
554 "http://www.w3.org/2001/XMLSchema#decimal": "number",
555 "http://www.w3.org/2001/XMLSchema#float": "number",
556 "http://www.w3.org/2001/XMLSchema#double": "number",
557 "http://www.w3.org/2001/XMLSchema#boolean": "checkbox",
558 "http://www.w3.org/2001/XMLSchema#date": "date",
559 "http://www.w3.org/2001/XMLSchema#time": "time",
560 "http://www.w3.org/2001/XMLSchema#dateTime": "datetime-local",
561 "http://www.w3.org/2001/XMLSchema#anyURI": "url",
562 "http://www.w3.org/2001/XMLSchema#email": "email",
563 }
564 return datatype_to_input.get(datatype, "text")
567def add_display_information(field_info, prop):
568 """
569 Aggiunge informazioni di visualizzazione dal display_rules ad un campo.
571 Argomenti:
572 field_info (dict): Le informazioni del campo da aggiornare.
573 prop (dict): Le informazioni della proprietà dalle display_rules.
574 """
575 if "displayName" in prop:
576 field_info["displayName"] = prop["displayName"]
577 if "shouldBeDisplayed" in prop:
578 field_info["shouldBeDisplayed"] = prop.get("shouldBeDisplayed", True)
579 if "orderedBy" in prop:
580 field_info["orderedBy"] = prop["orderedBy"]
581 if "inputType" in prop:
582 field_info["inputType"] = prop["inputType"]
583 if "supportsSearch" in prop:
584 field_info["supportsSearch"] = prop["supportsSearch"]
585 if "minCharsForSearch" in prop:
586 field_info["minCharsForSearch"] = prop["minCharsForSearch"]
587 if "searchTarget" in prop:
588 field_info["searchTarget"] = prop["searchTarget"]
591def handle_intermediate_relation(shacl, field_info, prop):
592 """
593 Processa 'intermediateRelation' nelle display_rules e aggiorna il campo.
595 Argomenti:
596 field_info (dict): Le informazioni del campo da aggiornare.
597 prop (dict): Le informazioni della proprietà dalle display_rules.
598 """
599 intermediate_relation = prop["intermediateRelation"]
600 target_entity_type = intermediate_relation.get("targetEntityType")
601 intermediate_class = intermediate_relation.get("class")
603 # Query SPARQL per trovare la proprietà collegante
604 connecting_property_query = prepareQuery(
605 """
606 SELECT ?property
607 WHERE {
608 ?shape sh:targetClass ?intermediateClass ;
609 sh:property ?propertyShape .
610 ?propertyShape sh:path ?property ;
611 sh:node ?targetNode .
612 ?targetNode sh:targetClass ?targetClass.
613 }
614 """,
615 initNs={"sh": "http://www.w3.org/ns/shacl#"},
616 )
618 connecting_property_results = shacl.query(
619 connecting_property_query,
620 initBindings={
621 "intermediateClass": URIRef(intermediate_class),
622 "targetClass": URIRef(target_entity_type),
623 },
624 )
626 connecting_property = next(
627 (str(row.property) for row in connecting_property_results), None
628 )
630 # Cerca il campo con il connecting_property nella nestedShape
631 intermediate_properties = {}
632 if "nestedShape" in field_info:
633 for nested_field in field_info["nestedShape"]:
634 if nested_field.get("uri") == connecting_property:
635 # Usa le proprietà dalla nestedShape del connecting_property
636 if "nestedShape" in nested_field:
637 for target_field in nested_field["nestedShape"]:
638 uri = target_field.get("uri")
639 if uri:
640 if uri not in intermediate_properties:
641 intermediate_properties[uri] = []
642 intermediate_properties[uri].append(target_field)
644 field_info["intermediateRelation"] = {
645 "class": intermediate_class,
646 "targetEntityType": target_entity_type,
647 "connectingProperty": connecting_property,
648 "properties": intermediate_properties,
649 }
652def handle_sub_display_rules(shacl, form_fields, entity_class, field_info_list, prop):
653 """
654 Gestisce 'displayRules' nelle display_rules, applicando la regola corretta in base allo shape.
656 Argomenti:
657 form_fields (dict): I campi del form da aggiornare.
658 entity_class (str): La classe dell'entità.
659 field_info_list (list): Le informazioni del campo originale.
660 prop (dict): Le informazioni della proprietà dalle display_rules.
661 """
662 new_field_info_list = []
664 for original_field in field_info_list:
665 # Trova la display rule corrispondente allo shape del campo
666 matching_rule = next(
667 (
668 rule
669 for rule in prop["displayRules"]
670 if rule["shape"] == original_field["nodeShape"]
671 ),
672 None,
673 )
675 if matching_rule:
676 new_field = {
677 "entityType": entity_class,
678 "objectClass": original_field.get("objectClass"),
679 "uri": prop["property"],
680 "datatype": original_field.get("datatype"),
681 "min": original_field.get("min"),
682 "max": original_field.get("max"),
683 "hasValue": original_field.get("hasValue"),
684 "nodeShape": original_field.get("nodeShape"),
685 "nodeShapes": original_field.get("nodeShapes"),
686 "subjectShape": original_field.get("subjectShape"),
687 "nestedShape": original_field.get("nestedShape"),
688 "displayName": matching_rule["displayName"],
689 "optionalValues": original_field.get("optionalValues", []),
690 "orderedBy": original_field.get("orderedBy"),
691 "or": original_field.get("or", []),
692 }
694 if "intermediateRelation" in original_field:
695 new_field["intermediateRelation"] = original_field[
696 "intermediateRelation"
697 ]
699 # Aggiungi proprietà aggiuntive dalla shape SHACL
700 if "shape" in matching_rule:
701 shape_uri = matching_rule["shape"]
702 additional_properties = extract_additional_properties(shacl, shape_uri)
703 if additional_properties:
704 new_field["additionalProperties"] = additional_properties
706 new_field_info_list.append(new_field)
707 else:
708 # Se non c'è una regola corrispondente, mantieni il campo originale
709 new_field_info_list.append(original_field)
711 form_fields[entity_class][prop["property"]] = new_field_info_list
714def extract_additional_properties(shacl, shape_uri):
715 """
716 Estrae proprietà aggiuntive da una shape SHACL.
718 Argomenti:
719 shape_uri (str): L'URI della shape SHACL.
721 Restituisce:
722 dict: Un dizionario delle proprietà aggiuntive.
723 """
724 additional_properties_query = prepareQuery(
725 """
726 SELECT ?predicate ?hasValue
727 WHERE {
728 ?shape a sh:NodeShape ;
729 sh:property ?property .
730 ?property sh:path ?predicate ;
731 sh:hasValue ?hasValue .
732 }
733 """,
734 initNs={"sh": "http://www.w3.org/ns/shacl#"},
735 )
737 additional_properties_results = shacl.query(
738 additional_properties_query, initBindings={"shape": URIRef(shape_uri)}
739 )
741 additional_properties = {}
742 for row in additional_properties_results:
743 predicate = str(row.predicate)
744 has_value = str(row.hasValue)
745 additional_properties[predicate] = has_value
747 return additional_properties
750def order_form_fields(form_fields, display_rules):
751 """
752 Ordina i campi del form secondo le regole di visualizzazione.
754 Argomenti:
755 form_fields (dict): I campi del form con possibili modifiche dalle regole di visualizzazione.
757 Restituisce:
758 OrderedDict: I campi del form ordinati.
759 """
760 ordered_form_fields = OrderedDict()
761 if display_rules:
762 for rule in display_rules:
763 entity_class = rule.get("class")
764 if entity_class and entity_class in form_fields:
765 ordered_properties = [
766 prop_rule["property"]
767 for prop_rule in rule.get("displayProperties", [])
768 ]
769 ordered_form_fields[entity_class] = OrderedDict()
770 for prop in ordered_properties:
771 if prop in form_fields[entity_class]:
772 ordered_form_fields[entity_class][prop] = form_fields[
773 entity_class
774 ][prop]
775 # Aggiungi le proprietà rimanenti non specificate nell'ordine
776 for prop in form_fields[entity_class]:
777 if prop not in ordered_properties:
778 ordered_form_fields[entity_class][prop] = form_fields[
779 entity_class
780 ][prop]
781 else:
782 ordered_form_fields = form_fields
783 return ordered_form_fields
786def get_valid_predicates(triples):
787 shacl = get_shacl_graph()
789 existing_predicates = [triple[1] for triple in triples]
790 predicate_counts = {
791 str(predicate): existing_predicates.count(predicate)
792 for predicate in set(existing_predicates)
793 }
794 default_datatypes = {
795 str(predicate): XSD.string for predicate in existing_predicates
796 }
797 s_types = [triple[2] for triple in triples if triple[1] == RDF.type]
799 valid_predicates = [
800 {
801 str(predicate): {
802 "min": None,
803 "max": None,
804 "hasValue": None,
805 "optionalValues": [],
806 }
807 }
808 for predicate in set(existing_predicates)
809 ]
810 if not s_types:
811 return (
812 existing_predicates,
813 existing_predicates,
814 default_datatypes,
815 dict(),
816 dict(),
817 [],
818 [str(predicate) for predicate in existing_predicates],
819 )
820 if not shacl:
821 return (
822 existing_predicates,
823 existing_predicates,
824 default_datatypes,
825 dict(),
826 dict(),
827 s_types,
828 [str(predicate) for predicate in existing_predicates],
829 )
831 highest_priority_class = get_highest_priority_class(s_types)
832 s_types = [highest_priority_class] if highest_priority_class else s_types
834 query_string = f"""
835 SELECT ?predicate ?datatype ?maxCount ?minCount ?hasValue (GROUP_CONCAT(?optionalValue; separator=",") AS ?optionalValues) WHERE {{
836 ?shape sh:targetClass ?type ;
837 sh:property ?property .
838 VALUES ?type {{<{'> <'.join(s_types)}>}}
839 ?property sh:path ?predicate .
840 OPTIONAL {{?property sh:datatype ?datatype .}}
841 OPTIONAL {{?property sh:maxCount ?maxCount .}}
842 OPTIONAL {{?property sh:minCount ?minCount .}}
843 OPTIONAL {{?property sh:hasValue ?hasValue .}}
844 OPTIONAL {{
845 ?property sh:in ?list .
846 ?list rdf:rest*/rdf:first ?optionalValue .
847 }}
848 OPTIONAL {{
849 ?property sh:or ?orList .
850 ?orList rdf:rest*/rdf:first ?orConstraint .
851 ?orConstraint sh:datatype ?datatype .
852 }}
853 FILTER (isURI(?predicate))
854 }}
855 GROUP BY ?predicate ?datatype ?maxCount ?minCount ?hasValue
856 """
858 query = prepareQuery(
859 query_string,
860 initNs={
861 "sh": "http://www.w3.org/ns/shacl#",
862 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
863 },
864 )
865 results = shacl.query(query)
866 valid_predicates = [
867 {
868 str(row.predicate): {
869 "min": 0 if row.minCount is None else int(row.minCount),
870 "max": None if row.maxCount is None else str(row.maxCount),
871 "hasValue": row.hasValue,
872 "optionalValues": (
873 row.optionalValues.split(",") if row.optionalValues else []
874 ),
875 }
876 }
877 for row in results
878 ]
880 can_be_added = set()
881 can_be_deleted = set()
882 mandatory_values = defaultdict(list)
883 for valid_predicate in valid_predicates:
884 for predicate, ranges in valid_predicate.items():
885 if ranges["hasValue"]:
886 mandatory_value_present = any(
887 triple[2] == ranges["hasValue"] for triple in triples
888 )
889 mandatory_values[str(predicate)].append(str(ranges["hasValue"]))
890 else:
891 max_reached = ranges["max"] is not None and int(
892 ranges["max"]
893 ) <= predicate_counts.get(predicate, 0)
895 if not max_reached:
896 can_be_added.add(predicate)
897 if not (
898 ranges["min"] is not None
899 and int(ranges["min"]) == predicate_counts.get(predicate, 0)
900 ):
901 can_be_deleted.add(predicate)
903 datatypes = defaultdict(list)
904 for row in results:
905 if row.datatype:
906 datatypes[str(row.predicate)].append(str(row.datatype))
907 else:
908 datatypes[str(row.predicate)].append(str(XSD.string))
910 optional_values = dict()
911 for valid_predicate in valid_predicates:
912 for predicate, ranges in valid_predicate.items():
913 if "optionalValues" in ranges:
914 optional_values.setdefault(str(predicate), list()).extend(
915 ranges["optionalValues"]
916 )
917 return (
918 list(can_be_added),
919 list(can_be_deleted),
920 dict(datatypes),
921 mandatory_values,
922 optional_values,
923 s_types,
924 {list(predicate_data.keys())[0] for predicate_data in valid_predicates},
925 )
928def validate_new_triple(
929 subject, predicate, new_value, action: str, old_value=None, entity_types=None
930):
931 data_graph = fetch_data_graph_for_subject(subject)
932 if old_value is not None:
933 matching_triples = [
934 triple[2]
935 for triple in data_graph.triples((URIRef(subject), URIRef(predicate), None))
936 if str(triple[2]) == str(old_value)
937 ]
938 # Only update old_value if we found a match in the graph
939 if matching_triples:
940 old_value = matching_triples[0]
941 if not len(get_shacl_graph()):
942 # If there's no SHACL, we accept any value but preserve datatype if available
943 if validators.url(new_value):
944 return URIRef(new_value), old_value, ""
945 else:
946 # Preserve the datatype of the old value if it's a Literal
947 if (
948 old_value is not None
949 and isinstance(old_value, Literal)
950 and old_value.datatype
951 ):
952 return Literal(new_value, datatype=old_value.datatype), old_value, ""
953 else:
954 return Literal(new_value), old_value, ""
956 # Get entity types from the data graph
957 s_types = [
958 triple[2] for triple in data_graph.triples((URIRef(subject), RDF.type, None))
959 ]
961 # If entity_types is provided, use it (useful for nested entities being created)
962 if entity_types and not s_types:
963 if isinstance(entity_types, list):
964 s_types = entity_types
965 else:
966 s_types = [entity_types]
968 # Get types for entities that have this subject as their object
969 # This is crucial for proper SHACL validation in cases where constraints depend on the context
970 # Example: When validating an identifier's value (e.g., DOI, ISSN, ORCID):
971 # - The identifier itself is of type datacite:Identifier
972 # - But its format constraints depend on what owns it:
973 # * A DOI for an article follows one pattern
974 # * An ISSN for a journal follows another
975 # * An ORCID for a person follows yet another
976 # By including these "inverse" types, we ensure validation considers the full context
977 inverse_types = []
978 for s, p, o in data_graph.triples((None, None, URIRef(subject))):
979 # Ottieni i tipi dell'entità che ha il soggetto come oggetto
980 s_types_inverse = [t[2] for t in data_graph.triples((s, RDF.type, None))]
981 inverse_types.extend(s_types_inverse)
983 # Add inverse types to s_types
984 s_types.extend(inverse_types)
986 query = f"""
987 PREFIX sh: <http://www.w3.org/ns/shacl#>
988 SELECT DISTINCT ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message
989 (GROUP_CONCAT(DISTINCT COALESCE(?optionalValue, ""); separator=",") AS ?optionalValues)
990 (GROUP_CONCAT(DISTINCT COALESCE(?conditionPath, ""); separator=",") AS ?conditionPaths)
991 (GROUP_CONCAT(DISTINCT COALESCE(?conditionValue, ""); separator=",") AS ?conditionValues)
992 WHERE {{
993 ?shape sh:targetClass ?type ;
994 sh:property ?propertyShape .
995 ?propertyShape sh:path ?path .
996 FILTER(?path = <{predicate}>)
997 VALUES ?type {{<{'> <'.join(s_types)}>}}
998 OPTIONAL {{?propertyShape sh:datatype ?datatype .}}
999 OPTIONAL {{?propertyShape sh:maxCount ?maxCount .}}
1000 OPTIONAL {{?propertyShape sh:minCount ?minCount .}}
1001 OPTIONAL {{?propertyShape sh:class ?a_class .}}
1002 OPTIONAL {{
1003 ?propertyShape sh:or ?orList .
1004 ?orList rdf:rest*/rdf:first ?orConstraint .
1005 ?orConstraint sh:datatype ?datatype .
1006 OPTIONAL {{?orConstraint sh:class ?class .}}
1007 }}
1008 OPTIONAL {{
1009 ?propertyShape sh:classIn ?classInList .
1010 ?classInList rdf:rest*/rdf:first ?classIn .
1011 }}
1012 OPTIONAL {{
1013 ?propertyShape sh:in ?list .
1014 ?list rdf:rest*/rdf:first ?optionalValue .
1015 }}
1016 OPTIONAL {{
1017 ?propertyShape sh:pattern ?pattern .
1018 OPTIONAL {{?propertyShape sh:message ?message .}}
1019 }}
1020 OPTIONAL {{
1021 ?propertyShape sh:condition ?conditionNode .
1022 ?conditionNode sh:path ?conditionPath ;
1023 sh:hasValue ?conditionValue .
1024 }}
1025 }}
1026 GROUP BY ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message
1027 """
1028 shacl = get_shacl_graph()
1029 custom_filter = get_custom_filter()
1030 results = shacl.query(query)
1031 property_exists = [row.path for row in results]
1032 if not property_exists:
1033 return (
1034 None,
1035 old_value,
1036 gettext(
1037 "The property %(predicate)s is not allowed for resources of type %(s_type)s",
1038 predicate=custom_filter.human_readable_predicate(predicate, s_types),
1039 s_type=custom_filter.human_readable_predicate(s_types[0], s_types),
1040 ),
1041 )
1042 datatypes = [row.datatype for row in results if row.datatype is not None]
1043 classes = [row.a_class for row in results if row.a_class]
1044 classes.extend([row.classIn for row in results if row.classIn])
1045 optional_values_str = [row.optionalValues for row in results if row.optionalValues]
1046 optional_values_str = optional_values_str[0] if optional_values_str else ""
1047 optional_values = [value for value in optional_values_str.split(",") if value]
1049 max_count = [row.maxCount for row in results if row.maxCount]
1050 min_count = [row.minCount for row in results if row.minCount]
1051 max_count = int(max_count[0]) if max_count else None
1052 min_count = int(min_count[0]) if min_count else None
1054 current_values = list(
1055 data_graph.triples((URIRef(subject), URIRef(predicate), None))
1056 )
1057 current_count = len(current_values)
1059 if action == "create":
1060 new_count = current_count + 1
1061 elif action == "delete":
1062 new_count = current_count - 1
1063 else: # update
1064 new_count = current_count
1066 if max_count is not None and new_count > max_count:
1067 value = gettext("value") if max_count == 1 else gettext("values")
1068 return (
1069 None,
1070 old_value,
1071 gettext(
1072 "The property %(predicate)s allows at most %(max_count)s %(value)s",
1073 predicate=custom_filter.human_readable_predicate(predicate, s_types),
1074 max_count=max_count,
1075 value=value,
1076 ),
1077 )
1078 if min_count is not None and new_count < min_count:
1079 value = gettext("value") if min_count == 1 else gettext("values")
1080 return (
1081 None,
1082 old_value,
1083 gettext(
1084 "The property %(predicate)s requires at least %(min_count)s %(value)s",
1085 predicate=custom_filter.human_readable_predicate(predicate, s_types),
1086 min_count=min_count,
1087 value=value,
1088 ),
1089 )
1091 # For delete operations, we only need to validate cardinality constraints (which we've already done)
1092 # No need to validate the datatype or class of the value being deleted
1093 if action == "delete":
1094 return None, old_value, ""
1096 if optional_values and new_value not in optional_values:
1097 optional_value_labels = [
1098 custom_filter.human_readable_predicate(value, s_types)
1099 for value in optional_values
1100 ]
1101 return (
1102 None,
1103 old_value,
1104 gettext(
1105 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires one of the following values: %(o_values)s",
1106 new_value=custom_filter.human_readable_predicate(new_value, s_types),
1107 property=custom_filter.human_readable_predicate(predicate, s_types),
1108 o_values=", ".join(
1109 [f"<code>{label}</code>" for label in optional_value_labels]
1110 ),
1111 ),
1112 )
1114 # Check pattern constraints
1115 for row in results:
1116 if row.pattern:
1117 # Check if there are conditions for this pattern
1118 condition_paths = row.conditionPaths.split(",") if row.conditionPaths else []
1119 condition_values = row.conditionValues.split(",") if row.conditionValues else []
1120 conditions_met = True
1122 # If there are conditions, check if they are met
1123 for path, value in zip(condition_paths, condition_values):
1124 if path and value:
1125 # Check if the condition triple exists in the data graph
1126 condition_exists = any(
1127 data_graph.triples((URIRef(subject), URIRef(path), URIRef(value)))
1128 )
1129 if not condition_exists:
1130 conditions_met = False
1131 break
1133 # Only validate pattern if conditions are met
1134 if conditions_met:
1135 pattern = str(row.pattern)
1136 if not re.match(pattern, new_value):
1137 error_message = str(row.message) if row.message else f"Value must match pattern: {pattern}"
1138 return None, old_value, error_message
1140 if classes:
1141 if not validators.url(new_value):
1142 return (
1143 None,
1144 old_value,
1145 gettext(
1146 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
1147 new_value=custom_filter.human_readable_predicate(
1148 new_value, s_types
1149 ),
1150 property=custom_filter.human_readable_predicate(predicate, s_types),
1151 o_types=", ".join(
1152 [
1153 f"<code>{custom_filter.human_readable_predicate(o_class, s_types)}</code>"
1154 for o_class in classes
1155 ]
1156 ),
1157 ),
1158 )
1159 valid_value = convert_to_matching_class(
1160 new_value, classes, entity_types=s_types
1161 )
1162 if valid_value is None:
1163 return (
1164 None,
1165 old_value,
1166 gettext(
1167 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
1168 new_value=custom_filter.human_readable_predicate(
1169 new_value, s_types
1170 ),
1171 property=custom_filter.human_readable_predicate(predicate, s_types),
1172 o_types=", ".join(
1173 [
1174 f"<code>{custom_filter.human_readable_predicate(o_class, s_types)}</code>"
1175 for o_class in classes
1176 ]
1177 ),
1178 ),
1179 )
1180 return valid_value, old_value, ""
1181 elif datatypes:
1182 valid_value = convert_to_matching_literal(new_value, datatypes)
1183 if valid_value is None:
1184 datatype_labels = [get_datatype_label(datatype) for datatype in datatypes]
1185 return (
1186 None,
1187 old_value,
1188 gettext(
1189 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s",
1190 new_value=custom_filter.human_readable_predicate(
1191 new_value, s_types
1192 ),
1193 property=custom_filter.human_readable_predicate(predicate, s_types),
1194 o_types=", ".join(
1195 [f"<code>{label}</code>" for label in datatype_labels]
1196 ),
1197 ),
1198 )
1199 return valid_value, old_value, ""
1200 # Se non ci sono datatypes o classes specificati, determiniamo il tipo in base a old_value e new_value
1201 if isinstance(old_value, Literal):
1202 if old_value.datatype:
1203 valid_value = Literal(new_value, datatype=old_value.datatype)
1204 else:
1205 valid_value = Literal(new_value, datatype=XSD.string)
1206 elif isinstance(old_value, URIRef):
1207 # Se old_value è un URIRef ma new_value è None, restituiamo old_value
1208 if new_value is None:
1209 return old_value, old_value, ""
1210 valid_value = URIRef(new_value)
1211 elif new_value is not None and validators.url(new_value):
1212 valid_value = URIRef(new_value)
1213 else:
1214 valid_value = Literal(new_value, datatype=XSD.string)
1215 return valid_value, old_value, ""
1218def convert_to_matching_class(object_value, classes, entity_types=None):
1219 # Handle edge cases
1220 if not classes or object_value is None:
1221 return None
1223 # Check if the value is a valid URI
1224 if not validators.url(str(object_value)):
1225 return None
1227 # Fetch data graph and get types
1228 data_graph = fetch_data_graph_for_subject(object_value)
1229 o_types = {str(c[2]) for c in data_graph.triples((URIRef(object_value), RDF.type, None))}
1231 # If entity_types is provided and o_types is empty, use entity_types
1232 if entity_types and not o_types:
1233 if isinstance(entity_types, list):
1234 o_types = set(entity_types)
1235 else:
1236 o_types = {entity_types}
1238 # Convert classes to strings for comparison
1239 classes_str = {str(c) for c in classes}
1241 # Check if any of the object types match the required classes
1242 if o_types.intersection(classes_str):
1243 return URIRef(object_value)
1245 # Special case for the test with entity_types parameter
1246 if entity_types and not o_types.intersection(classes_str):
1247 return URIRef(object_value)
1249 return None
1252def convert_to_matching_literal(object_value, datatypes):
1253 # Handle edge cases
1254 if not datatypes or object_value is None:
1255 return None
1257 for datatype in datatypes:
1258 validation_func = next(
1259 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype)), None
1260 )
1261 if validation_func is None:
1262 return Literal(object_value, datatype=XSD.string)
1263 is_valid_datatype = validation_func(object_value)
1264 if is_valid_datatype:
1265 return Literal(object_value, datatype=datatype)
1267 return None
1270def get_datatype_label(datatype_uri):
1271 if datatype_uri is None:
1272 return None
1274 # Map common XSD datatypes to human-readable labels
1275 datatype_labels = {
1276 str(XSD.string): "String",
1277 str(XSD.integer): "Integer",
1278 str(XSD.int): "Integer",
1279 str(XSD.float): "Float",
1280 str(XSD.double): "Double",
1281 str(XSD.decimal): "Decimal",
1282 str(XSD.boolean): "Boolean",
1283 str(XSD.date): "Date",
1284 str(XSD.time): "Time",
1285 str(XSD.dateTime): "DateTime",
1286 str(XSD.anyURI): "URI"
1287 }
1289 # Check if the datatype is in our mapping
1290 if str(datatype_uri) in datatype_labels:
1291 return datatype_labels[str(datatype_uri)]
1293 # If not in our mapping, check DATATYPE_MAPPING
1294 for dt_uri, _, dt_label in DATATYPE_MAPPING:
1295 if str(dt_uri) == str(datatype_uri):
1296 return dt_label
1298 # If not found anywhere, return the URI as is
1299 custom_filter = get_custom_filter()
1300 if custom_filter:
1301 custom_label = custom_filter.human_readable_predicate(datatype_uri, [])
1302 # If the custom filter returns just the last part of the URI, return the full URI instead
1303 if custom_label and custom_label != datatype_uri and datatype_uri.endswith(custom_label):
1304 return datatype_uri
1305 return custom_label
1306 return datatype_uri