Coverage for heritrace / utils / shacl_display.py: 97%
316 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import json
6from collections import OrderedDict, defaultdict
7from collections.abc import Iterable
8from dataclasses import dataclass
9from pathlib import Path
10from typing import cast
12from flask import Flask
13from rdflib import Graph, URIRef
14from rdflib.plugins.sparql import prepareQuery
15from rdflib.plugins.sparql.sparql import Query
16from rdflib.query import Result, ResultRow
18from heritrace.sparql import select_results
19from heritrace.utils.filters import Filter
22@dataclass(slots=True)
23class ShaclProcessingContext:
24 shacl: Graph
25 display_rules: list[dict[str, object]] | None
26 app: Flask
27 processed_shapes: set[str]
30@dataclass(slots=True)
31class _ParsedRow:
32 subject_shape: str
33 entity_type: str
34 predicate: str
35 node_shape: str | None
36 has_value: str | None
37 object_class: str | None
38 min_count: int
39 max_count: int | None
40 datatype: str | None
41 optional_values: list[str]
42 or_nodes: list[str]
43 entity_key: tuple[str, str]
44 condition_entry: dict[str, object]
45 node_shapes: list[str]
48COMMON_SPARQL_QUERY = prepareQuery(
49 """
50 SELECT ?shape ?type ?predicate ?node_shape ?datatype
51 ?max_count ?min_count ?has_value ?object_class
52 ?condition_path ?condition_value ?pattern ?message
53 (GROUP_CONCAT(?optional_value; separator=",")
54 AS ?optional_values)
55 (GROUP_CONCAT(?or_node; separator=",") AS ?or_nodes)
56 WHERE {
57 ?shape sh:targetClass ?type ;
58 sh:property ?property .
59 ?property sh:path ?predicate .
60 OPTIONAL {
61 ?property sh:node ?node_shape .
62 OPTIONAL {
63 ?node_shape sh:targetClass ?object_class .
64 }
65 }
66 OPTIONAL {
67 ?property sh:or ?orList .
68 {
69 ?orList rdf:rest*/rdf:first ?or_constraint .
70 ?or_constraint sh:datatype ?datatype .
71 } UNION {
72 ?orList rdf:rest*/rdf:first ?or_node_shape .
73 ?or_node_shape sh:node ?or_node .
74 } UNION {
75 ?orList rdf:rest*/rdf:first ?or_constraint .
76 ?or_constraint sh:hasValue ?optional_value .
77 }
78 }
79 OPTIONAL { ?property sh:datatype ?datatype . }
80 OPTIONAL { ?property sh:maxCount ?max_count . }
81 OPTIONAL { ?property sh:minCount ?min_count . }
82 OPTIONAL { ?property sh:hasValue ?has_value . }
83 OPTIONAL {
84 ?property sh:in ?list .
85 ?list rdf:rest*/rdf:first ?optional_value .
86 }
87 OPTIONAL {
88 ?property sh:condition ?condition_node .
89 ?condition_node sh:path ?condition_path ;
90 sh:hasValue ?condition_value .
91 }
92 OPTIONAL { ?property sh:pattern ?pattern . }
93 OPTIONAL { ?property sh:message ?message . }
94 FILTER (isURI(?predicate))
95 }
96 GROUP BY ?shape ?type ?predicate ?node_shape ?datatype
97 ?max_count ?min_count ?has_value ?object_class
98 ?condition_path ?condition_value ?pattern ?message
99""",
100 initNs={
101 "sh": "http://www.w3.org/ns/shacl#",
102 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
103 },
104)
107def _parse_row(row: ResultRow) -> _ParsedRow:
108 subject_shape = str(row.shape)
109 entity_type = str(row.type)
110 predicate = str(row.predicate)
111 node_shape = str(row.node_shape) if row.node_shape else None
112 has_value = str(row.has_value) if row.has_value else None
113 object_class = str(row.object_class) if row.object_class else None
114 min_count = 0 if row.min_count is None else int(row.min_count)
115 max_count = None if row.max_count is None else int(row.max_count)
116 datatype = str(row.datatype) if row.datatype else None
117 optional_values = [v for v in (row.optional_values or "").split(",") if v]
118 or_nodes = [v for v in (row.or_nodes or "").split(",") if v]
120 condition_entry: dict[str, object] = {}
121 if row.condition_path and row.condition_value:
122 condition_entry["condition"] = {
123 "path": str(row.condition_path),
124 "value": str(row.condition_value),
125 }
126 if row.pattern:
127 condition_entry["pattern"] = str(row.pattern)
128 if row.message:
129 condition_entry["message"] = str(row.message)
131 node_shapes = []
132 if node_shape:
133 node_shapes.append(node_shape)
134 node_shapes.extend(or_nodes)
136 return _ParsedRow(
137 subject_shape=subject_shape,
138 entity_type=entity_type,
139 predicate=predicate,
140 node_shape=node_shape,
141 has_value=has_value,
142 object_class=object_class,
143 min_count=min_count,
144 max_count=max_count,
145 datatype=datatype,
146 optional_values=optional_values,
147 or_nodes=or_nodes,
148 entity_key=(entity_type, subject_shape),
149 condition_entry=condition_entry,
150 node_shapes=node_shapes,
151 )
154def _find_existing_field(
155 fields: list[dict[str, object]],
156 parsed: _ParsedRow,
157) -> dict[str, object] | None:
158 for field in fields:
159 if (
160 field.get("nodeShape") == parsed.node_shape
161 and field.get("nodeShapes") == parsed.node_shapes
162 and field.get("subjectShape") == parsed.subject_shape
163 and field.get("hasValue") == parsed.has_value
164 and field.get("objectClass") == parsed.object_class
165 and field.get("min") == parsed.min_count
166 and field.get("max") == parsed.max_count
167 and field.get("optionalValues") == parsed.optional_values
168 ):
169 return field
170 return None
173def _process_or_nodes(
174 ctx: ShaclProcessingContext,
175 parsed: _ParsedRow,
176 custom_filter: Filter,
177 field_info: dict[str, object],
178 depth: int,
179) -> None:
180 field_info["or"] = []
181 for node in parsed.or_nodes:
182 entity_type_or_node = get_shape_target_class(ctx.shacl, node)
183 object_class = get_object_class(ctx.shacl, node, parsed.predicate)
184 shape_display_name = custom_filter.human_readable_class(
185 (entity_type_or_node, node)
186 )
187 or_field_info: dict[str, object] = {
188 "entityType": entity_type_or_node,
189 "uri": parsed.predicate,
190 "displayName": shape_display_name,
191 "subjectShape": parsed.subject_shape,
192 "nodeShape": node,
193 "min": parsed.min_count,
194 "max": parsed.max_count,
195 "hasValue": parsed.has_value,
196 "objectClass": object_class,
197 "optionalValues": parsed.optional_values,
198 "conditions": ([parsed.condition_entry] if parsed.condition_entry else []),
199 "shouldBeDisplayed": True,
200 }
201 if node not in ctx.processed_shapes:
202 or_field_info["nestedShape"] = process_nested_shapes(
203 ctx,
204 node,
205 depth=depth + 1,
206 )
207 field_info["or"].append(or_field_info)
210def _process_single_row(
211 ctx: ShaclProcessingContext,
212 parsed: _ParsedRow,
213 custom_filter: Filter,
214 form_fields: defaultdict[tuple[str, str], dict[str, list[dict[str, object]]]],
215 depth: int,
216) -> None:
217 if parsed.predicate not in form_fields[parsed.entity_key]:
218 form_fields[parsed.entity_key][parsed.predicate] = []
220 existing_field = _find_existing_field(
221 form_fields[parsed.entity_key][parsed.predicate], parsed
222 )
224 if existing_field:
225 if parsed.datatype and str(parsed.datatype) not in cast(
226 "list", existing_field.get("datatypes", [])
227 ):
228 cast("list", existing_field.setdefault("datatypes", [])).append(
229 str(parsed.datatype)
230 )
231 if parsed.condition_entry:
232 cast("list", existing_field.setdefault("conditions", [])).append(
233 parsed.condition_entry
234 )
235 return
237 field_info: dict[str, object] = {
238 "entityType": parsed.entity_type,
239 "uri": parsed.predicate,
240 "nodeShape": parsed.node_shape,
241 "nodeShapes": parsed.node_shapes,
242 "subjectShape": parsed.subject_shape,
243 "entityKey": parsed.entity_key,
244 "datatypes": [parsed.datatype] if parsed.datatype else [],
245 "min": parsed.min_count,
246 "max": parsed.max_count,
247 "hasValue": parsed.has_value,
248 "objectClass": parsed.object_class,
249 "optionalValues": parsed.optional_values,
250 "conditions": ([parsed.condition_entry] if parsed.condition_entry else []),
251 "inputType": determine_input_type(parsed.datatype),
252 "shouldBeDisplayed": True,
253 }
255 if parsed.node_shape and parsed.node_shape not in ctx.processed_shapes:
256 field_info["nestedShape"] = process_nested_shapes(
257 ctx,
258 parsed.node_shape,
259 depth=depth + 1,
260 )
262 if parsed.or_nodes:
263 _process_or_nodes(ctx, parsed, custom_filter, field_info, depth)
265 form_fields[parsed.entity_key][parsed.predicate].append(field_info)
268def process_query_results(
269 ctx: ShaclProcessingContext,
270 results: Iterable[ResultRow],
271 depth: int = 0,
272) -> defaultdict[tuple[str, str], dict[str, list[dict[str, object]]]]:
273 form_fields: defaultdict[tuple[str, str], dict[str, list[dict[str, object]]]] = (
274 defaultdict(dict)
275 )
277 with (Path(__file__).parent / "context.json").open() as config_file:
278 context = json.load(config_file)["@context"]
280 custom_filter = Filter(context, ctx.display_rules, ctx.app.config["DATASET_DB_URL"])
282 for row in results:
283 parsed = _parse_row(row)
284 _process_single_row(ctx, parsed, custom_filter, form_fields, depth)
286 return form_fields
289def process_nested_shapes(
290 ctx: ShaclProcessingContext,
291 shape_uri: str,
292 depth: int = 0,
293) -> list[dict[str, object]]:
294 if shape_uri in ctx.processed_shapes:
295 return []
297 ctx.processed_shapes.add(shape_uri)
298 init_bindings = {"shape": URIRef(shape_uri)}
299 nested_results = execute_shacl_query(ctx.shacl, COMMON_SPARQL_QUERY, init_bindings)
300 nested_fields = []
302 temp_form_fields = process_query_results(
303 ctx,
304 select_results(nested_results),
305 depth=depth,
306 )
308 if ctx.display_rules:
309 temp_form_fields = apply_display_rules(
310 ctx.shacl, temp_form_fields, ctx.display_rules
311 )
312 temp_form_fields = order_form_fields(temp_form_fields, ctx.display_rules)
314 for entity_type in temp_form_fields:
315 for predicate in temp_form_fields[entity_type]:
316 nested_fields.extend(temp_form_fields[entity_type][predicate])
318 ctx.processed_shapes.remove(shape_uri)
319 return nested_fields
322def get_property_order(
323 entity_type: str,
324 display_rules: list[dict[str, object]] | None,
325) -> list[str | None]:
326 """
327 Recupera l'ordine delle proprietà per un tipo di entità dalle regole di
328 visualizzazione.
330 Argomenti:
331 entity_type (str): L'URI del tipo di entità.
333 Restituisce:
334 list: Una lista di URI di proprietà nell'ordine desiderato.
335 """
336 if not display_rules:
337 return []
339 for rule in display_rules:
340 if rule.get("class") == entity_type and "propertyOrder" in rule:
341 return cast("list[str | None]", rule["propertyOrder"])
342 if rule.get("class") == entity_type:
343 display_props = cast(
344 "list[dict[str, object]]", rule.get("displayProperties", [])
345 )
346 return [
347 cast("str | None", prop.get("property") or prop.get("virtual_property"))
348 for prop in display_props
349 if prop.get("property") or prop.get("virtual_property")
350 ]
351 return []
354def order_fields(
355 fields: list[dict[str, object]],
356 property_order: list[str],
357) -> list[dict[str, object]]:
358 """
359 Ordina i campi secondo l'ordine specificato delle proprietà.
361 Argomenti:
362 fields (list): Una lista di dizionari dei campi da ordinare.
363 property_order (list): Una lista di URI di proprietà nell'ordine desiderato.
365 Restituisce:
366 list: Una lista ordinata di dizionari dei campi.
367 """
368 if not fields:
369 return []
370 if not property_order:
371 return fields
373 # Create a dictionary to map predicates to their position in property_order
374 order_dict = {pred: i for i, pred in enumerate(property_order)}
376 # Sort fields based on their position in property_order
377 # Fields not in property_order will be placed at the end
378 return sorted(
379 fields,
380 key=lambda f: order_dict.get(
381 str(f.get("predicate", f.get("uri", ""))), float("inf")
382 ),
383 )
386def _find_matching_entity_keys(
387 form_fields: dict[
388 tuple[str, str],
389 dict[str, list[dict[str, object]]],
390 ],
391 entity_class: str | None,
392 entity_shape: str | None,
393) -> list[tuple[str, str]]:
394 if entity_class and entity_shape:
395 entity_key = (entity_class, entity_shape)
396 return [entity_key] if entity_key in form_fields else []
397 if entity_class:
398 return [key for key in form_fields if key[0] == entity_class]
399 if entity_shape:
400 return [key for key in form_fields if key[1] == entity_shape]
401 return []
404def _get_ordered_properties_from_rule(
405 rule: dict[str, object],
406) -> list[str | None]:
407 display_props = cast("list[dict[str, object]]", rule.get("displayProperties", []))
408 return [
409 cast(
410 "str | None",
411 prop_rule.get("property") or prop_rule.get("virtual_property"),
412 )
413 for prop_rule in display_props
414 if prop_rule.get("property") or prop_rule.get("virtual_property")
415 ]
418def _order_entity_fields(
419 form_fields: dict[
420 tuple[str, str],
421 dict[str, list[dict[str, object]]],
422 ],
423 entity_key: tuple[str, str],
424 ordered_properties: list[str | None],
425 ordered_form_fields: OrderedDict[
426 tuple[str, str],
427 OrderedDict[str, list[dict[str, object]]],
428 ],
429) -> None:
430 ordered_form_fields[entity_key] = OrderedDict()
431 for prop in ordered_properties:
432 if prop in form_fields[entity_key]:
433 ordered_form_fields[entity_key][prop] = form_fields[entity_key][prop]
434 # Aggiungi le proprietà rimanenti non specificate nell'ordine
435 for prop in form_fields[entity_key]:
436 if prop not in ordered_properties:
437 ordered_form_fields[entity_key][prop] = form_fields[entity_key][prop]
440def order_form_fields(
441 form_fields: dict[
442 tuple[str, str],
443 dict[str, list[dict[str, object]]],
444 ],
445 display_rules: list[dict[str, object]] | None,
446) -> (
447 OrderedDict[
448 tuple[str, str],
449 OrderedDict[str, list[dict[str, object]]],
450 ]
451 | dict[
452 tuple[str, str],
453 dict[str, list[dict[str, object]]],
454 ]
455):
456 """
457 Ordina i campi del form secondo le regole di visualizzazione.
459 Argomenti:
460 form_fields (dict): I campi del form con possibili modifiche dalle regole di
461 visualizzazione.
463 Restituisce:
464 OrderedDict: I campi del form ordinati.
465 """
466 ordered_form_fields = OrderedDict()
467 if not display_rules:
468 return form_fields
469 for rule in display_rules:
470 target = cast("dict[str, str]", rule.get("target", {}))
471 entity_class = target.get("class")
472 entity_shape = target.get("shape")
473 ordered_properties = _get_ordered_properties_from_rule(rule)
474 matching_keys = _find_matching_entity_keys(
475 form_fields, entity_class, entity_shape
476 )
477 for key in matching_keys:
478 _order_entity_fields(
479 form_fields, key, ordered_properties, ordered_form_fields
480 )
481 return ordered_form_fields
484def apply_display_rules(
485 shacl: Graph,
486 form_fields: dict[
487 tuple[str, str],
488 dict[str, list[dict[str, object]]],
489 ],
490 display_rules: list[dict[str, object]],
491) -> dict[
492 tuple[str, str],
493 dict[str, list[dict[str, object]]],
494]:
495 """
496 Applica le regole di visualizzazione ai campi del form.
498 Argomenti:
499 form_fields (dict): I campi del form iniziali estratti dalle shape SHACL.
501 Restituisce:
502 dict: I campi del form dopo aver applicato le regole di visualizzazione.
503 """
504 for rule in display_rules:
505 target = cast("dict[str, str]", rule.get("target", {}))
506 entity_class = target.get("class")
507 entity_shape = target.get("shape")
509 # Handle different cases based on available target information
510 # Case 1: Both class and shape are specified (exact match)
511 if entity_class and entity_shape:
512 entity_key = (entity_class, entity_shape)
513 if entity_key in form_fields:
514 apply_rule_to_entity(shacl, form_fields, entity_key, rule)
515 # Case 2: Only class is specified (apply to all matching classes)
516 elif entity_class:
517 for key in list(form_fields.keys()):
518 if key[0] == entity_class: # Check if class part of tuple matches
519 apply_rule_to_entity(shacl, form_fields, key, rule)
520 # Case 3: Only shape is specified (apply to all matching shapes)
521 elif entity_shape:
522 for key in list(form_fields.keys()):
523 if key[1] == entity_shape: # Check if shape part of tuple matches
524 apply_rule_to_entity(shacl, form_fields, key, rule)
525 return form_fields
528def apply_rule_to_entity(
529 shacl: Graph,
530 form_fields: dict[
531 tuple[str, str],
532 dict[str, list[dict[str, object]]],
533 ],
534 entity_key: tuple[str, str],
535 rule: dict[str, object],
536) -> None:
537 """
538 Apply a display rule to a specific entity key.
540 Args:
541 shacl: The SHACL graph
542 form_fields: The form fields dictionary
543 entity_key: The entity key tuple (class, shape)
544 rule: The display rule to apply
545 """
546 display_props = cast("list[dict[str, object]]", rule.get("displayProperties", []))
547 for prop in display_props:
548 prop_uri = prop.get("property") or prop.get("virtual_property")
549 if prop_uri and prop_uri in form_fields[entity_key]:
550 for field_info in form_fields[entity_key][str(prop_uri)]:
551 add_display_information(field_info, prop)
552 if "nestedShape" in field_info:
553 target = cast("dict[str, str]", rule.get("target", {}))
554 apply_display_rules_to_nested_shapes(
555 cast("list[dict[str, object]]", field_info["nestedShape"]),
556 prop,
557 target.get("shape"),
558 )
559 if "or" in field_info:
560 target = cast("dict[str, str]", rule.get("target", {}))
561 for or_field in cast("list[dict[str, object]]", field_info["or"]):
562 apply_display_rules_to_nested_shapes(
563 [or_field], field_info, target.get("shape")
564 )
565 if "intermediateRelation" in prop:
566 handle_intermediate_relation(shacl, field_info, prop)
567 if "displayRules" in prop:
568 handle_sub_display_rules(
569 shacl,
570 form_fields,
571 entity_key,
572 form_fields[entity_key][str(prop_uri)],
573 prop,
574 )
577def apply_display_rules_to_nested_shapes( # noqa: C901
578 nested_fields: list[dict[str, object]],
579 parent_prop: dict[str, object],
580 shape_uri: str | None,
581) -> list[dict[str, object]]:
582 """Apply display rules to nested shapes."""
583 if not nested_fields:
584 return []
586 # Handle case where parent_prop is not a dictionary
587 if not isinstance(parent_prop, dict):
588 return nested_fields
590 # Create a new list to avoid modifying the original
591 result_fields = []
592 for field in nested_fields:
593 # Create a copy of the field to avoid modifying the original
594 new_field = field.copy()
595 result_fields.append(new_field)
597 display_rules = cast("list[dict[str, object]]", parent_prop.get("displayRules", []))
598 for rule in display_rules:
599 if rule.get("shape") == shape_uri and "nestedDisplayRules" in rule:
600 nested_display_rules = cast(
601 "list[dict[str, object]]", rule["nestedDisplayRules"]
602 )
603 for field in result_fields:
604 for nested_rule in nested_display_rules:
605 field_key = field.get("predicate", field.get("uri"))
606 if field_key == nested_rule["property"]:
607 # Apply display properties from the rule to the field
608 for key, value in nested_rule.items():
609 if key != "property":
610 field[key] = value
611 break
613 return result_fields
616def determine_input_type(datatype: str | None) -> str:
617 """
618 Determina il tipo di input appropriato basato sul datatype XSD.
619 """
620 if not datatype:
621 return "text"
623 datatype = str(datatype)
624 datatype_to_input = {
625 "http://www.w3.org/2001/XMLSchema#string": "text",
626 "http://www.w3.org/2001/XMLSchema#integer": "number",
627 "http://www.w3.org/2001/XMLSchema#decimal": "number",
628 "http://www.w3.org/2001/XMLSchema#float": "number",
629 "http://www.w3.org/2001/XMLSchema#double": "number",
630 "http://www.w3.org/2001/XMLSchema#boolean": "checkbox",
631 "http://www.w3.org/2001/XMLSchema#date": "date",
632 "http://www.w3.org/2001/XMLSchema#time": "time",
633 "http://www.w3.org/2001/XMLSchema#dateTime": "datetime-local",
634 "http://www.w3.org/2001/XMLSchema#anyURI": "url",
635 "http://www.w3.org/2001/XMLSchema#email": "email",
636 }
637 return datatype_to_input.get(datatype, "text")
640def add_display_information(
641 field_info: dict[str, object],
642 prop: dict[str, object],
643) -> None:
644 """
645 Aggiunge informazioni di visualizzazione dal display_rules ad un campo.
647 Argomenti:
648 field_info (dict): Le informazioni del campo da aggiornare.
649 prop (dict): Le informazioni della proprietà dalle display_rules.
650 """
651 if "displayName" in prop:
652 field_info["displayName"] = prop["displayName"]
653 if "shouldBeDisplayed" in prop:
654 field_info["shouldBeDisplayed"] = prop.get("shouldBeDisplayed", True)
655 if "orderedBy" in prop:
656 field_info["orderedBy"] = prop["orderedBy"]
657 if "inputType" in prop:
658 field_info["inputType"] = prop["inputType"]
659 if "supportsSearch" in prop:
660 field_info["supportsSearch"] = prop["supportsSearch"]
661 if "minCharsForSearch" in prop:
662 field_info["minCharsForSearch"] = prop["minCharsForSearch"]
663 if "searchTarget" in prop:
664 field_info["searchTarget"] = prop["searchTarget"]
667def handle_intermediate_relation(
668 shacl: Graph,
669 field_info: dict[str, object],
670 prop: dict[str, object],
671) -> None:
672 """
673 Processa 'intermediateRelation' nelle display_rules e aggiorna il campo.
675 Argomenti:
676 field_info (dict): Le informazioni del campo da aggiornare.
677 prop (dict): Le informazioni della proprietà dalle display_rules.
678 """
679 intermediate_relation = cast("dict[str, str]", prop["intermediateRelation"])
680 target_entity_type = intermediate_relation["targetEntityType"]
681 intermediate_class = intermediate_relation["class"]
683 connecting_property_query = prepareQuery(
684 """
685 SELECT ?property
686 WHERE {
687 ?shape sh:targetClass ?intermediateClass ;
688 sh:property ?propertyShape .
689 ?propertyShape sh:path ?property ;
690 sh:node ?targetNode .
691 ?targetNode sh:targetClass ?targetClass.
692 }
693 """,
694 initNs={"sh": "http://www.w3.org/ns/shacl#"},
695 )
697 connecting_property_results = shacl.query(
698 connecting_property_query,
699 initBindings={
700 "intermediateClass": URIRef(intermediate_class),
701 "targetClass": URIRef(target_entity_type),
702 },
703 )
705 connecting_property = next(
706 (str(row.property) for row in select_results(connecting_property_results)), None
707 )
709 intermediate_properties = {}
710 target_shape = None
711 if "nestedShape" in field_info:
712 for nested_field in cast("list[dict[str, object]]", field_info["nestedShape"]):
713 if (
714 nested_field.get("uri") == connecting_property
715 and "nestedShape" in nested_field
716 ) and "nestedShape" in nested_field:
717 for target_field in cast(
718 "list[dict[str, object]]", nested_field["nestedShape"]
719 ):
720 uri = target_field.get("uri")
721 if uri:
722 if uri not in intermediate_properties:
723 intermediate_properties[uri] = []
724 intermediate_properties[uri].append(target_field)
725 if target_field.get("subjectShape"):
726 target_shape = target_field["subjectShape"]
728 field_info["intermediateRelation"] = {
729 "class": intermediate_class,
730 "targetEntityType": target_entity_type,
731 "targetShape": target_shape,
732 "connectingProperty": connecting_property,
733 "properties": intermediate_properties,
734 }
737def handle_sub_display_rules(
738 shacl: Graph,
739 form_fields: dict[
740 tuple[str, str],
741 dict[str, list[dict[str, object]]],
742 ],
743 entity_key: tuple[str, str],
744 field_info_list: list[dict[str, object]],
745 prop: dict[str, object],
746) -> None:
747 """
748 Gestisce 'displayRules' nelle display_rules, applicando la regola corretta in base
749 allo shape.
751 Argomenti:
752 form_fields (dict): I campi del form da aggiornare.
753 entity_key (tuple): La chiave dell'entità (class, shape).
754 field_info_list (list): Le informazioni del campo originale.
755 prop (dict): Le informazioni della proprietà dalle display_rules.
756 """
757 new_field_info_list = []
758 entity_class = entity_key[0] if isinstance(entity_key, tuple) else entity_key
760 for original_field in field_info_list:
761 # Trova la display rule corrispondente allo shape del campo
762 sub_display_rules = cast("list[dict[str, object]]", prop["displayRules"])
763 matching_rule = next(
764 (
765 rule
766 for rule in sub_display_rules
767 if rule["shape"] == original_field["nodeShape"]
768 ),
769 None,
770 )
772 if matching_rule:
773 new_field = {
774 "entityType": entity_class,
775 "entityKey": entity_key, # Store the tuple key
776 "objectClass": original_field.get("objectClass"),
777 "uri": prop["property"],
778 "datatype": original_field.get("datatype"),
779 "min": original_field.get("min"),
780 "max": original_field.get("max"),
781 "hasValue": original_field.get("hasValue"),
782 "nodeShape": original_field.get("nodeShape"),
783 "nodeShapes": original_field.get("nodeShapes"),
784 "subjectShape": original_field.get("subjectShape"),
785 "nestedShape": original_field.get("nestedShape"),
786 "displayName": matching_rule["displayName"],
787 "optionalValues": original_field.get("optionalValues", []),
788 "orderedBy": original_field.get("orderedBy"),
789 "or": original_field.get("or", []),
790 }
792 if "intermediateRelation" in original_field:
793 new_field["intermediateRelation"] = original_field[
794 "intermediateRelation"
795 ]
797 # Aggiungi proprietà aggiuntive dalla shape SHACL
798 if "shape" in matching_rule:
799 shape_uri = str(matching_rule["shape"])
800 additional_properties = extract_additional_properties(shacl, shape_uri)
801 if additional_properties:
802 new_field["additionalProperties"] = additional_properties
804 new_field_info_list.append(new_field)
805 else:
806 # Se non c'è una regola corrispondente, mantieni il campo originale
807 new_field_info_list.append(original_field)
809 form_fields[entity_key][str(prop["property"])] = new_field_info_list
812def get_shape_target_class(shacl: Graph, shape_uri: str) -> str | None:
813 query = prepareQuery(
814 """
815 SELECT ?targetClass
816 WHERE {
817 ?shape sh:targetClass ?targetClass .
818 }
819 """,
820 initNs={"sh": "http://www.w3.org/ns/shacl#"},
821 )
822 results = execute_shacl_query(shacl, query, {"shape": URIRef(shape_uri)})
823 for row in select_results(results):
824 return str(row.targetClass)
825 return None
828def get_object_class(shacl: Graph, shape_uri: str, predicate_uri: str) -> str | None:
829 query = prepareQuery(
830 """
831 SELECT DISTINCT ?targetClass
832 WHERE {
833 ?shape sh:property ?propertyShape .
834 ?propertyShape sh:path ?predicate .
835 {
836 # Caso 1: definizione diretta con sh:node
837 ?propertyShape sh:node ?nodeShape .
838 ?nodeShape sh:targetClass ?targetClass .
839 } UNION {
840 # Caso 2: definizione diretta con sh:class
841 ?propertyShape sh:class ?targetClass .
842 } UNION {
843 # Caso 3: definizione con sh:or che include node shapes
844 ?propertyShape sh:or ?orList .
845 ?orList rdf:rest*/rdf:first ?choice .
846 {
847 ?choice sh:node ?nodeShape .
848 ?nodeShape sh:targetClass ?targetClass .
849 } UNION {
850 ?choice sh:class ?targetClass .
851 }
852 }
853 }
854 """,
855 initNs={
856 "sh": "http://www.w3.org/ns/shacl#",
857 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
858 },
859 )
861 results = execute_shacl_query(
862 shacl, query, {"shape": URIRef(shape_uri), "predicate": URIRef(predicate_uri)}
863 )
865 # Prendiamo il primo risultato valido
866 for row in select_results(results):
867 if row.targetClass:
868 return str(row.targetClass)
869 return None
872def extract_shacl_form_fields(
873 shacl: Graph | None,
874 display_rules: list[dict[str, object]] | None,
875 app: Flask,
876) -> (
877 dict[
878 tuple[str, str],
879 dict[str, list[dict[str, object]]],
880 ]
881 | defaultdict[
882 tuple[str, str],
883 dict[str, list[dict[str, object]]],
884 ]
885):
886 """
887 Estrae i campi del form dalle shape SHACL.
889 Args:
890 shacl: The SHACL graph
891 display_rules: The display rules configuration
892 app: Flask application instance
894 Returns:
895 defaultdict: A dictionary where the keys are tuples (class, shape) and the
896 values are dictionaries
897 of form fields with their properties.
898 """
899 if not shacl:
900 return {}
902 ctx = ShaclProcessingContext(
903 shacl=shacl,
904 display_rules=display_rules,
905 app=app,
906 processed_shapes=set(),
907 )
908 results = execute_shacl_query(shacl, COMMON_SPARQL_QUERY)
909 return process_query_results(
910 ctx,
911 select_results(results),
912 depth=0,
913 )
916def execute_shacl_query(
917 shacl: Graph,
918 query: Query,
919 init_bindings: dict[str, URIRef] | None = None,
920) -> Result:
921 """
922 Esegue una query SPARQL sul grafo SHACL con eventuali binding iniziali.
924 Args:
925 shacl (Graph): The SHACL graph on which to execute the query.
926 query (PreparedQuery): The prepared SPARQL query.
927 init_bindings (dict): Initial bindings for the query.
929 Returns:
930 Result: The query results.
931 """
932 if init_bindings:
933 return shacl.query(query, initBindings=init_bindings)
934 return shacl.query(query)
937def extract_additional_properties(shacl: Graph, shape_uri: str) -> dict[str, str]:
938 """
939 Estrae proprietà aggiuntive da una shape SHACL.
941 Argomenti:
942 shape_uri (str): L'URI della shape SHACL.
944 Restituisce:
945 dict: Un dizionario delle proprietà aggiuntive.
946 """
947 additional_properties_query = prepareQuery(
948 """
949 SELECT ?predicate ?has_value
950 WHERE {
951 ?shape a sh:NodeShape ;
952 sh:property ?property .
953 ?property sh:path ?predicate ;
954 sh:hasValue ?has_value .
955 }
956 """,
957 initNs={"sh": "http://www.w3.org/ns/shacl#"},
958 )
960 additional_properties_results = shacl.query(
961 additional_properties_query,
962 initBindings={"shape": URIRef(shape_uri)},
963 )
965 additional_properties = {}
966 for row in select_results(additional_properties_results):
967 predicate = str(row.predicate)
968 has_value = str(row.has_value)
969 additional_properties[predicate] = has_value
971 return additional_properties