Coverage for heritrace / utils / shacl_utils.py: 96%
221 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from collections.abc import Iterable
6from weakref import WeakKeyDictionary
8from flask import Flask
9from rdflib import RDF, Graph
10from SPARQLWrapper import JSON
12from heritrace.extensions import get_form_fields, get_shacl_graph, get_sparql
13from heritrace.sparql import get_sparql_bindings, select_results
14from heritrace.utils.display_rules_utils import get_class_priority
15from heritrace.utils.filters import format_uri_as_readable
16from heritrace.utils.shacl_display import (
17 ShaclProcessingContext,
18 apply_display_rules,
19 extract_shacl_form_fields,
20 order_form_fields,
21 process_nested_shapes,
22)
23from heritrace.utils.virtual_properties import get_virtual_properties_for_entity
25_class_shapes_cache: WeakKeyDictionary[Graph, dict[str, list[str]]] = (
26 WeakKeyDictionary()
27)
28_shape_properties_cache: WeakKeyDictionary[Graph, dict[str, set[str]]] = (
29 WeakKeyDictionary()
30)
31_hasvalue_constraints_cache: WeakKeyDictionary[
32 Graph, dict[str, list[tuple[str, str]]]
33] = WeakKeyDictionary()
36def get_form_fields_from_shacl(
37 shacl: Graph | None, display_rules: list[dict] | None, app: Flask
38) -> dict:
39 """
40 Analyze SHACL shapes to extract form fields for each entity type.
42 Args:
43 shacl: The SHACL graph
44 display_rules: The display rules configuration
45 app: Flask application instance
47 Returns:
48 OrderedDict: A dictionary where the keys are tuples (class, shape) and the
49 values are dictionaries
50 of form fields with their properties.
51 """
52 if not shacl:
53 return {}
55 # Step 1: Get the initial form fields from SHACL shapes
56 form_fields = extract_shacl_form_fields(shacl, display_rules, app=app)
58 # Step 2: Process nested shapes for each field
59 processed_shapes: set[str] = set()
60 ctx = ShaclProcessingContext(
61 shacl=shacl,
62 display_rules=display_rules,
63 app=app,
64 processed_shapes=processed_shapes,
65 )
66 for entity_key in form_fields:
67 for predicate in form_fields[entity_key]:
68 for field_info in form_fields[entity_key][predicate]:
69 if field_info.get("nodeShape"):
70 field_info["nestedShape"] = process_nested_shapes(
71 ctx,
72 str(field_info["nodeShape"]),
73 )
75 # Step 3: Apply display rules to the form fields
76 if display_rules:
77 form_fields = apply_display_rules(shacl, form_fields, display_rules)
79 # Step 3.5: Ensure all form fields have displayName, using fallback for those
80 # without display rules
81 ensure_display_names(form_fields)
83 # Step 4: Add virtual properties to form_fields
84 enhanced_form_fields = add_virtual_properties_to_form_fields_internal(form_fields)
86 # Step 5: Order form fields (including virtual properties)
87 return order_form_fields(enhanced_form_fields, display_rules)
90def _apply_field_overrides(shape_data: dict, override: dict) -> dict:
91 nested_field = shape_data.copy()
92 if "shouldBeDisplayed" in override:
93 nested_field["shouldBeDisplayed"] = override["shouldBeDisplayed"]
94 if "displayName" in override:
95 nested_field["displayName"] = override["displayName"]
96 if "value" in override:
97 nested_field["hasValue"] = override["value"]
98 nested_field["nestedShape"] = []
99 return nested_field
102def _build_nested_shape_entry(vp: dict, enhanced_form_fields: dict) -> list[dict]:
103 implementation = vp.get("implementedVia", {})
104 target = implementation.get("target", {})
105 intermediate_class = target.get("class")
106 specific_shape = target.get("shape")
108 if not specific_shape and intermediate_class:
109 specific_shape = determine_shape_for_classes([intermediate_class])
111 intermediate_entity_key = find_matching_form_field(
112 class_uri=intermediate_class,
113 shape_uri=specific_shape,
114 form_fields=enhanced_form_fields,
115 )
117 nested_shape_list: list[dict] = []
118 if not intermediate_entity_key:
119 return nested_shape_list
121 nested_shape_data = enhanced_form_fields.get(intermediate_entity_key, {})
122 field_overrides = implementation.get("fieldOverrides", {})
124 for nested_prop_uri, nested_details_list in nested_shape_data.items():
125 for nested_details in nested_details_list:
126 if nested_prop_uri in field_overrides:
127 nested_field = _apply_field_overrides(
128 nested_details, field_overrides[nested_prop_uri]
129 )
130 else:
131 nested_field = nested_details.copy()
133 if nested_field.get("shouldBeDisplayed", True):
134 nested_shape_list.append(nested_field)
136 return nested_shape_list
139def add_virtual_properties_to_form_fields_internal(form_fields: dict) -> dict:
140 enhanced_form_fields = form_fields.copy() if form_fields else {}
142 for entity_key in enhanced_form_fields:
143 entity_class, entity_shape = entity_key
145 virtual_properties = get_virtual_properties_for_entity(
146 entity_class, entity_shape
147 )
149 if not virtual_properties:
150 continue
152 for display_name, prop_config in virtual_properties:
153 if not prop_config.get("shouldBeDisplayed", True):
154 continue
156 nested_shape_list = _build_nested_shape_entry(
157 prop_config, enhanced_form_fields
158 )
160 virtual_form_field = {
161 "displayName": prop_config.get("displayName", display_name),
162 "uri": display_name,
163 "is_virtual": True,
164 "min": 0,
165 "max": None,
166 "datatypes": [],
167 "optionalValues": [],
168 "orderedBy": None,
169 "nodeShape": None,
170 "subjectClass": None,
171 "subjectShape": None,
172 "objectClass": None,
173 "entityType": None,
174 "nestedShape": nested_shape_list,
175 "or": None,
176 }
178 enhanced_form_fields[entity_key][display_name] = [virtual_form_field]
180 return enhanced_form_fields
183def _get_shapes_for_class(shacl_graph: Graph, class_uri: str) -> list[str]:
184 per_graph = _class_shapes_cache.setdefault(shacl_graph, {})
185 if class_uri not in per_graph:
186 query_string = f"""
187 SELECT DISTINCT ?shape WHERE {{
188 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> .
189 }}
190 """
192 results = shacl_graph.query(query_string)
193 per_graph[class_uri] = [str(row.shape) for row in select_results(results)]
194 return per_graph[class_uri]
197def determine_shape_for_classes(class_list: list[str]) -> str | None:
198 """
199 Determine the most appropriate SHACL shape for a list of class URIs.
201 Args:
202 class_list: List of class URIs to find shapes for
204 Returns:
205 The most appropriate shape URI based on priority, or None if no shapes are found
206 """
207 shacl_graph = get_shacl_graph()
208 if not shacl_graph:
209 return None
211 all_shacl_shapes = []
213 for class_uri in class_list:
214 shapes = _get_shapes_for_class(shacl_graph, class_uri)
215 all_shacl_shapes.extend((class_uri, shape) for shape in shapes)
217 return _find_highest_priority_shape(all_shacl_shapes)
220def determine_shape_for_entity_triples(entity_triples: Iterable) -> str | None:
221 """
222 Determine the most appropriate SHACL shape for an entity based on its triples.
224 Uses a multi-criteria scoring system to distinguish between shapes:
225 1. sh:hasValue constraint matches (highest priority)
226 2. Property matching - number of shape properties present in entity
227 3. Class priority - predefined priority ordering
229 Args:
230 entity_triples: List of triples (subject, predicate, object) for the entity
232 Returns:
233 The most appropriate shape URI, or None if no shapes are found
234 """
235 shacl_graph = get_shacl_graph()
236 if not shacl_graph:
237 return None
239 entity_classes = []
240 entity_properties = set()
242 for _subject, predicate, obj in entity_triples:
243 if str(predicate) == str(RDF.type):
244 entity_classes.append(str(obj))
245 entity_properties.add(str(predicate))
247 if not entity_classes:
248 return None
250 candidate_shapes = []
252 for class_uri in entity_classes:
253 shapes = _get_shapes_for_class(shacl_graph, class_uri)
254 candidate_shapes.extend((class_uri, shape) for shape in shapes)
256 if not candidate_shapes:
257 return None
259 if len(candidate_shapes) == 1:
260 return candidate_shapes[0][1]
262 shape_scores = {}
264 for class_uri, shape_uri in candidate_shapes:
265 shape_properties = _get_shape_properties(shacl_graph, shape_uri)
266 property_matches = len(entity_properties.intersection(shape_properties))
268 hasvalue_matches = _check_hasvalue_constraints(
269 shacl_graph, shape_uri, entity_triples
270 )
272 entity_key = (class_uri, shape_uri)
273 priority = get_class_priority(entity_key)
275 # Combined score: (hasvalue_matches, property_matches, -priority)
276 # hasValue matches are most important, then property matches, then priority
277 combined_score = (hasvalue_matches, property_matches, -priority)
278 shape_scores[shape_uri] = combined_score
280 return max(shape_scores.keys(), key=lambda s: shape_scores[s])
283def _find_highest_priority_shape(
284 class_shape_pairs: list[tuple[str, str]],
285) -> str | None:
286 """
287 Helper function to find the shape with the highest priority from a list of
288 (class_uri, shape) pairs.
290 Args:
291 class_shape_pairs: List of tuples (class_uri, shape)
293 Returns:
294 The shape with the highest priority, or None if the list is empty
295 """
296 highest_priority = float("inf")
297 highest_priority_shape = None
299 for class_uri, shape in class_shape_pairs:
300 entity_key = (class_uri, shape)
301 priority = get_class_priority(entity_key)
302 if priority < highest_priority:
303 highest_priority = priority
304 highest_priority_shape = shape
306 return highest_priority_shape
309def _get_shape_properties(shacl_graph: Graph, shape_uri: str) -> set:
310 """
311 Extract all properties defined in a SHACL shape.
313 Args:
314 shacl_graph: The SHACL graph
315 shape_uri: URI of the shape to analyze
317 Returns:
318 Set of property URIs defined in the shape
319 """
320 per_graph = _shape_properties_cache.setdefault(shacl_graph, {})
321 if shape_uri not in per_graph:
322 query_string = f"""
323 PREFIX sh: <http://www.w3.org/ns/shacl#>
324 SELECT DISTINCT ?property WHERE {{
325 <{shape_uri}> sh:property ?propertyShape .
326 ?propertyShape sh:path ?property .
327 }}
328 """
330 results = shacl_graph.query(query_string)
331 per_graph[shape_uri] = {str(row.property) for row in select_results(results)}
333 return per_graph[shape_uri]
336def _get_hasvalue_constraints(
337 shacl_graph: Graph, shape_uri: str
338) -> list[tuple[str, str]]:
339 per_graph = _hasvalue_constraints_cache.setdefault(shacl_graph, {})
340 if shape_uri not in per_graph:
341 query_string = f"""
342 PREFIX sh: <http://www.w3.org/ns/shacl#>
343 SELECT DISTINCT ?property ?value WHERE {{
344 <{shape_uri}> sh:property ?propertyShape .
345 ?propertyShape sh:path ?property .
346 ?propertyShape sh:hasValue ?value .
347 }}
348 """
350 results = shacl_graph.query(query_string)
351 per_graph[shape_uri] = [
352 (str(row.property), str(row.value)) for row in select_results(results)
353 ]
355 return per_graph[shape_uri]
358def _check_hasvalue_constraints(
359 shacl_graph: Graph, shape_uri: str, entity_triples: Iterable
360) -> int:
361 """
362 Check how many sh:hasValue constraints the entity satisfies for a given shape.
364 Args:
365 shacl_graph: The SHACL graph
366 shape_uri: URI of the shape to check
367 entity_triples: List of triples (subject, predicate, object) for the entity
369 Returns:
370 Number of hasValue constraints satisfied by the entity
371 """
372 constraints = _get_hasvalue_constraints(shacl_graph, shape_uri)
374 if not constraints:
375 return 0
377 # Create a set of (predicate, object) pairs from entity triples
378 entity_property_values = set()
379 for _, predicate, obj in entity_triples:
380 entity_property_values.add((str(predicate), str(obj)))
382 # Count how many constraints are satisfied
383 satisfied_constraints = 0
384 for property_uri, required_value in constraints:
385 if (property_uri, required_value) in entity_property_values:
386 satisfied_constraints += 1
388 return satisfied_constraints
391def ensure_display_names(form_fields: dict) -> None:
392 """
393 Ensures all form fields have a displayName, using URI formatting as fallback.
395 Args:
396 form_fields: Dictionary of form fields to process
397 """
398 for predicates in form_fields.values():
399 for predicate_uri, details_list in predicates.items():
400 for field_info in details_list:
401 # Only add displayName if not already present
402 if not field_info.get("displayName"):
403 field_info["displayName"] = format_uri_as_readable(predicate_uri)
406def find_matching_form_field(
407 class_uri: str | None = None,
408 shape_uri: str | None = None,
409 form_fields: dict | None = None,
410) -> tuple[str, str] | None:
411 """
412 Find the most appropriate form field configuration for a given class and/or shape.
413 At least one of class_uri or shape_uri must be provided.
415 Args:
416 class_uri: Optional URI of the class
417 shape_uri: Optional URI of the shape
418 form_fields: Optional dictionary of form fields to search in, defaults to global
419 form_fields
421 Returns:
422 The matching form field key (class_uri, shape_uri) or None if no match is found
423 """
424 if not form_fields:
425 form_fields = get_form_fields()
427 if not form_fields:
428 return None
430 class_match = None
431 shape_match = None
433 for field_key in form_fields:
434 field_class_uri = field_key[0]
435 field_shape_uri = field_key[1]
437 # Case 1: Both class and shape match (exact match)
438 if (
439 class_uri
440 and shape_uri
441 and field_class_uri == str(class_uri)
442 and field_shape_uri == str(shape_uri)
443 ):
444 return field_key
446 # Case 2: Only class matches (and form field has no shape constraint)
447 if class_uri and field_class_uri == str(class_uri) and field_shape_uri is None:
448 class_match = field_key
450 # Case 3: Only shape matches (and form field has no class constraint)
451 elif (
452 shape_uri and field_shape_uri == str(shape_uri) and field_class_uri is None
453 ):
454 shape_match = field_key
456 # Case 4: Only class matches (even if form field has a shape)
457 elif class_uri and field_class_uri == str(class_uri) and not class_match:
458 class_match = field_key
460 # Return the best match based on specificity
461 # Shape rules typically have higher specificity, so prefer them
462 if shape_match:
463 return shape_match
464 if class_match:
465 return class_match
467 return None
470def _find_entity_position_in_order_map(entity_uri: str, order_map: dict) -> int | None:
471 """
472 Helper function to find entity position in an order map.
474 This function handles the case where there might be multiple independent ordered
475 chains
476 within the same predicate relationship. Each chain has its own starting element and
477 follows a linked-list structure where each entity points to the next one.
479 Args:
480 entity_uri: URI of the entity to find position for
481 order_map: Dictionary mapping entities to their next entity in sequence.
482 Key = current entity URI, Value = next entity URI (or None for last
483 element)
484 Example: {'entity1': 'entity2', 'entity2': 'entity3', 'entity3':
485 None,
486 'entity4': 'entity5', 'entity5': None}
487 This represents two chains: [entity1 -> entity2 -> entity3] and
488 [entity4 -> entity5]
490 Returns:
491 1-based position in the sequence, or None if not found
492 """
493 # Find all starting elements of ordered chains.
494 # A start element is one that appears as a key in the order_map but never as a
495 # value,
496 # meaning no other entity points to it (it's the head of a chain).
497 start_elements = set(order_map.keys()) - {
498 v for v in order_map.values() if v is not None
499 }
501 if not start_elements:
502 # No valid starting points found - this shouldn't happen in well-formed data
503 return None
505 # Since there can be multiple independent ordered chains, we need to check each one
506 # to find which chain contains our target entity
507 for start_element in start_elements:
508 # Build the complete sequence for this chain by following the linked-list
509 # structure
510 sequence = []
511 current_element = start_element
513 # Follow the chain from start to end
514 while current_element in order_map:
515 sequence.append(current_element)
516 # Move to the next element in the chain (or None if we've reached the end)
517 current_element = order_map[current_element]
519 # Check if our target entity is in this particular chain
520 try:
521 # If found, return its 1-based position within this chain
522 return (
523 sequence.index(entity_uri) + 1
524 ) # Convert from 0-based to 1-based indexing
525 except ValueError:
526 # Entity not found in this chain, try the next one
527 continue
529 # Entity was not found in any of the ordered chains
530 return None
533def get_entity_position_in_sequence(
534 entity_uri: str,
535 subject_uri: str,
536 predicate_uri: str,
537 order_property: str,
538 snapshot: Graph | None = None,
539) -> int | None:
540 """
541 Get the position of an entity in an ordered sequence.
543 Args:
544 entity_uri: URI of the entity to find position for
545 subject_uri: URI of the subject that has the ordered property
546 predicate_uri: URI of the ordered predicate
547 order_property: URI of the property that defines the ordering
548 snapshot: Optional graph snapshot for historical queries
550 Returns:
551 1-based position in the sequence, or None if not found
552 """
553 order_query = f"""
554 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue)
555 WHERE {{
556 <{subject_uri}> <{predicate_uri}> ?orderedEntity.
557 OPTIONAL {{
558 ?orderedEntity <{order_property}> ?next.
559 }}
560 }}
561 """
563 if snapshot:
564 order_results = list(select_results(snapshot.query(order_query)))
566 order_map = {}
567 for res in order_results:
568 ordered_entity = str(res[0])
569 next_value = str(res[1])
570 order_map[ordered_entity] = None if next_value == "NONE" else next_value
572 return _find_entity_position_in_order_map(entity_uri, order_map)
573 sparql = get_sparql()
574 sparql.setQuery(order_query)
575 sparql.setReturnFormat(JSON)
576 order_results = get_sparql_bindings(sparql.query().convert())
578 order_map = {}
579 for res in order_results:
580 ordered_entity = res["orderedEntity"]["value"]
581 next_value = res["nextValue"]["value"]
582 order_map[ordered_entity] = None if next_value == "NONE" else next_value
584 return _find_entity_position_in_order_map(entity_uri, order_map)