Coverage for heritrace / utils / shacl_utils.py: 96%
206 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from typing import List, Optional, Tuple
7from flask import Flask
8from heritrace.extensions import get_shacl_graph, get_sparql
9from heritrace.utils.display_rules_utils import get_class_priority
10from heritrace.utils.shacl_display import (apply_display_rules,
11 extract_shacl_form_fields,
12 order_form_fields,
13 process_nested_shapes)
14from rdflib import RDF, Graph
15from SPARQLWrapper import JSON
18def get_form_fields_from_shacl(shacl: Graph, display_rules: List[dict], app: Flask):
19 """
20 Analyze SHACL shapes to extract form fields for each entity type.
22 Args:
23 shacl: The SHACL graph
24 display_rules: The display rules configuration
25 app: Flask application instance
27 Returns:
28 OrderedDict: A dictionary where the keys are tuples (class, shape) and the values are dictionaries
29 of form fields with their properties.
30 """
31 if not shacl:
32 return dict()
34 # Step 1: Get the initial form fields from SHACL shapes
35 form_fields = extract_shacl_form_fields(shacl, display_rules, app=app)
37 # Step 2: Process nested shapes for each field
38 processed_shapes = set()
39 for entity_key in form_fields:
40 for predicate in form_fields[entity_key]:
41 for field_info in form_fields[entity_key][predicate]:
42 if field_info.get("nodeShape"):
43 field_info["nestedShape"] = process_nested_shapes(
44 shacl,
45 display_rules,
46 field_info["nodeShape"],
47 app=app,
48 processed_shapes=processed_shapes,
49 )
51 # Step 3: Apply display rules to the form fields
52 if display_rules:
53 form_fields = apply_display_rules(shacl, form_fields, display_rules)
55 # Step 3.5: Ensure all form fields have displayName, using fallback for those without display rules
56 ensure_display_names(form_fields)
58 # Step 4: Add virtual properties to form_fields
59 enhanced_form_fields = add_virtual_properties_to_form_fields_internal(form_fields)
61 # Step 5: Order form fields (including virtual properties)
62 ordered_form_fields = order_form_fields(enhanced_form_fields, display_rules)
64 return ordered_form_fields
67def add_virtual_properties_to_form_fields_internal(form_fields: dict) -> dict:
68 """
69 Add virtual properties to form_fields during initial processing.
71 Args:
72 form_fields: The original form_fields dictionary
74 Returns:
75 Enhanced form_fields dictionary with virtual properties included
76 """
77 from heritrace.utils.virtual_properties import get_virtual_properties_for_entity
79 enhanced_form_fields = form_fields.copy() if form_fields else {}
81 for entity_key in enhanced_form_fields.keys():
82 entity_class, entity_shape = entity_key
84 virtual_properties = get_virtual_properties_for_entity(entity_class, entity_shape)
86 if virtual_properties:
87 for display_name, prop_config in virtual_properties:
88 should_be_displayed = prop_config.get("shouldBeDisplayed", True)
89 if not should_be_displayed:
90 continue
92 implementation = prop_config.get("implementedVia", {})
93 target = implementation.get("target", {})
94 intermediate_class = target.get("class")
95 specific_shape = target.get("shape")
97 if not specific_shape and intermediate_class:
98 specific_shape = determine_shape_for_classes([intermediate_class])
100 intermediate_entity_key = find_matching_form_field(
101 class_uri=intermediate_class,
102 shape_uri=specific_shape,
103 form_fields=enhanced_form_fields
104 )
106 nested_shape_list = []
107 if intermediate_entity_key:
108 nested_shape_data = enhanced_form_fields.get(intermediate_entity_key, {})
109 field_overrides = implementation.get("fieldOverrides", {})
111 for nested_prop_uri, nested_details_list in nested_shape_data.items():
112 for nested_details in nested_details_list:
113 nested_field = nested_details.copy()
115 if nested_prop_uri in field_overrides:
116 override = field_overrides[nested_prop_uri]
117 if "shouldBeDisplayed" in override:
118 nested_field["shouldBeDisplayed"] = override["shouldBeDisplayed"]
119 if "displayName" in override:
120 nested_field["displayName"] = override["displayName"]
121 if "value" in override:
122 nested_field["hasValue"] = override["value"]
123 nested_field["nestedShape"] = []
125 if nested_field.get('shouldBeDisplayed', True):
126 nested_shape_list.append(nested_field)
128 virtual_form_field = {
129 "displayName": prop_config.get("displayName", display_name),
130 "uri": display_name,
131 "is_virtual": True,
132 "min": 0,
133 "max": None,
134 "datatypes": [],
135 "optionalValues": [],
136 "orderedBy": None,
137 "nodeShape": None,
138 "subjectClass": None,
139 "subjectShape": None,
140 "objectClass": None,
141 "entityType": None,
142 "nestedShape": nested_shape_list,
143 "or": None
144 }
146 enhanced_form_fields[entity_key][display_name] = [virtual_form_field]
148 return enhanced_form_fields
151def determine_shape_for_classes(class_list: List[str]) -> Optional[str]:
152 """
153 Determine the most appropriate SHACL shape for a list of class URIs.
155 Args:
156 class_list: List of class URIs to find shapes for
158 Returns:
159 The most appropriate shape URI based on priority, or None if no shapes are found
160 """
161 shacl_graph = get_shacl_graph()
162 if not shacl_graph:
163 return None
165 all_shacl_shapes = []
167 for class_uri in class_list:
168 query_string = f"""
169 SELECT DISTINCT ?shape WHERE {{
170 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> .
171 }}
172 """
174 results = shacl_graph.query(query_string)
175 shapes = [str(row.shape) for row in results]
177 for shape in shapes:
178 all_shacl_shapes.append((class_uri, shape))
180 return _find_highest_priority_shape(all_shacl_shapes)
183def determine_shape_for_entity_triples(entity_triples: list) -> Optional[str]:
184 """
185 Determine the most appropriate SHACL shape for an entity based on its triples.
187 Uses a multi-criteria scoring system to distinguish between shapes:
188 1. sh:hasValue constraint matches (highest priority)
189 2. Property matching - number of shape properties present in entity
190 3. Class priority - predefined priority ordering
192 Args:
193 entity_triples: List of triples (subject, predicate, object) for the entity
195 Returns:
196 The most appropriate shape URI, or None if no shapes are found
197 """
198 shacl_graph = get_shacl_graph()
199 if not shacl_graph:
200 return None
202 entity_classes = []
203 entity_properties = set()
205 for subject, predicate, obj in entity_triples:
206 if str(predicate) == str(RDF.type):
207 entity_classes.append(str(obj))
208 entity_properties.add(str(predicate))
210 if not entity_classes:
211 return None
213 candidate_shapes = []
215 for class_uri in entity_classes:
216 query_string = f"""
217 SELECT DISTINCT ?shape WHERE {{
218 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> .
219 }}
220 """
222 results = shacl_graph.query(query_string)
223 shapes = [str(row.shape) for row in results]
225 for shape in shapes:
226 candidate_shapes.append((class_uri, shape))
228 if not candidate_shapes:
229 return None
231 if len(candidate_shapes) == 1:
232 return candidate_shapes[0][1]
234 shape_scores = {}
236 for class_uri, shape_uri in candidate_shapes:
237 shape_properties = _get_shape_properties(shacl_graph, shape_uri)
238 property_matches = len(entity_properties.intersection(shape_properties))
240 hasvalue_matches = _check_hasvalue_constraints(shacl_graph, shape_uri, entity_triples)
242 entity_key = (class_uri, shape_uri)
243 priority = get_class_priority(entity_key)
245 # Combined score: (hasvalue_matches, property_matches, -priority)
246 # hasValue matches are most important, then property matches, then priority
247 combined_score = (hasvalue_matches, property_matches, -priority)
248 shape_scores[shape_uri] = combined_score
250 best_shape = max(shape_scores.keys(), key=lambda s: shape_scores[s])
251 return best_shape
254def _find_highest_priority_shape(class_shape_pairs: List[Tuple[str, str]]) -> Optional[str]:
255 """
256 Helper function to find the shape with the highest priority from a list of (class_uri, shape) pairs.
258 Args:
259 class_shape_pairs: List of tuples (class_uri, shape)
261 Returns:
262 The shape with the highest priority, or None if the list is empty
263 """
264 highest_priority = float('inf')
265 highest_priority_shape = None
267 for class_uri, shape in class_shape_pairs:
268 entity_key = (class_uri, shape)
269 priority = get_class_priority(entity_key)
270 if priority < highest_priority:
271 highest_priority = priority
272 highest_priority_shape = shape
274 return highest_priority_shape
277def _get_shape_properties(shacl_graph: Graph, shape_uri: str) -> set:
278 """
279 Extract all properties defined in a SHACL shape.
281 Args:
282 shacl_graph: The SHACL graph
283 shape_uri: URI of the shape to analyze
285 Returns:
286 Set of property URIs defined in the shape
287 """
288 properties = set()
290 query_string = f"""
291 PREFIX sh: <http://www.w3.org/ns/shacl#>
292 SELECT DISTINCT ?property WHERE {{
293 <{shape_uri}> sh:property ?propertyShape .
294 ?propertyShape sh:path ?property .
295 }}
296 """
298 results = shacl_graph.query(query_string)
299 for row in results:
300 properties.add(str(row.property))
302 return properties
305def _check_hasvalue_constraints(shacl_graph: Graph, shape_uri: str, entity_triples: list) -> int:
306 """
307 Check how many sh:hasValue constraints the entity satisfies for a given shape.
309 Args:
310 shacl_graph: The SHACL graph
311 shape_uri: URI of the shape to check
312 entity_triples: List of triples (subject, predicate, object) for the entity
314 Returns:
315 Number of hasValue constraints satisfied by the entity
316 """
317 # Get all hasValue constraints for this shape
318 query_string = f"""
319 PREFIX sh: <http://www.w3.org/ns/shacl#>
320 SELECT DISTINCT ?property ?value WHERE {{
321 <{shape_uri}> sh:property ?propertyShape .
322 ?propertyShape sh:path ?property .
323 ?propertyShape sh:hasValue ?value .
324 }}
325 """
327 results = shacl_graph.query(query_string)
328 constraints = [(str(row.property), str(row.value)) for row in results]
330 if not constraints:
331 return 0
333 # Create a set of (predicate, object) pairs from entity triples
334 entity_property_values = set()
335 for _, predicate, obj in entity_triples:
336 entity_property_values.add((str(predicate), str(obj)))
338 # Count how many constraints are satisfied
339 satisfied_constraints = 0
340 for property_uri, required_value in constraints:
341 if (property_uri, required_value) in entity_property_values:
342 satisfied_constraints += 1
344 return satisfied_constraints
347def ensure_display_names(form_fields):
348 """
349 Ensures all form fields have a displayName, using URI formatting as fallback.
351 Args:
352 form_fields: Dictionary of form fields to process
353 """
354 from heritrace.utils.filters import format_uri_as_readable
356 for entity_key, predicates in form_fields.items():
357 for predicate_uri, details_list in predicates.items():
358 for field_info in details_list:
359 # Only add displayName if not already present
360 if not field_info.get("displayName"):
361 field_info["displayName"] = format_uri_as_readable(predicate_uri)
364def find_matching_form_field(class_uri=None, shape_uri=None, form_fields=None):
365 """
366 Find the most appropriate form field configuration for a given class and/or shape.
367 At least one of class_uri or shape_uri must be provided.
369 Args:
370 class_uri: Optional URI of the class
371 shape_uri: Optional URI of the shape
372 form_fields: Optional dictionary of form fields to search in, defaults to global form_fields
374 Returns:
375 The matching form field key (class_uri, shape_uri) or None if no match is found
376 """
377 if not form_fields:
378 from heritrace.extensions import get_form_fields
379 form_fields = get_form_fields()
381 if not form_fields:
382 return None
384 class_match = None
385 shape_match = None
387 for field_key in form_fields.keys():
388 field_class_uri = field_key[0]
389 field_shape_uri = field_key[1]
391 # Case 1: Both class and shape match (exact match)
392 if class_uri and shape_uri and \
393 field_class_uri == str(class_uri) and \
394 field_shape_uri == str(shape_uri):
395 return field_key
397 # Case 2: Only class matches (and form field has no shape constraint)
398 elif class_uri and field_class_uri == str(class_uri) and field_shape_uri is None:
399 class_match = field_key
401 # Case 3: Only shape matches (and form field has no class constraint)
402 elif shape_uri and field_shape_uri == str(shape_uri) and field_class_uri is None:
403 shape_match = field_key
405 # Case 4: Only class matches (even if form field has a shape)
406 elif class_uri and field_class_uri == str(class_uri) and not class_match:
407 class_match = field_key
409 # Return the best match based on specificity
410 # Shape rules typically have higher specificity, so prefer them
411 if shape_match:
412 return shape_match
413 elif class_match:
414 return class_match
416 return None
419def _find_entity_position_in_order_map(entity_uri: str, order_map: dict) -> Optional[int]:
420 """
421 Helper function to find entity position in an order map.
423 This function handles the case where there might be multiple independent ordered chains
424 within the same predicate relationship. Each chain has its own starting element and
425 follows a linked-list structure where each entity points to the next one.
427 Args:
428 entity_uri: URI of the entity to find position for
429 order_map: Dictionary mapping entities to their next entity in sequence.
430 Key = current entity URI, Value = next entity URI (or None for last element)
431 Example: {'entity1': 'entity2', 'entity2': 'entity3', 'entity3': None,
432 'entity4': 'entity5', 'entity5': None}
433 This represents two chains: [entity1 -> entity2 -> entity3] and [entity4 -> entity5]
435 Returns:
436 1-based position in the sequence, or None if not found
437 """
438 # Find all starting elements of ordered chains.
439 # A start element is one that appears as a key in the order_map but never as a value,
440 # meaning no other entity points to it (it's the head of a chain).
441 start_elements = set(order_map.keys()) - set(v for v in order_map.values() if v is not None)
443 if not start_elements:
444 # No valid starting points found - this shouldn't happen in well-formed data
445 return None
447 # Since there can be multiple independent ordered chains, we need to check each one
448 # to find which chain contains our target entity
449 for start_element in start_elements:
450 # Build the complete sequence for this chain by following the linked-list structure
451 sequence = []
452 current_element = start_element
454 # Follow the chain from start to end
455 while current_element in order_map:
456 sequence.append(current_element)
457 # Move to the next element in the chain (or None if we've reached the end)
458 current_element = order_map[current_element]
460 # Check if our target entity is in this particular chain
461 try:
462 # If found, return its 1-based position within this chain
463 return sequence.index(entity_uri) + 1 # Convert from 0-based to 1-based indexing
464 except ValueError:
465 # Entity not found in this chain, try the next one
466 continue
468 # Entity was not found in any of the ordered chains
469 return None
472def get_entity_position_in_sequence(entity_uri: str, subject_uri: str, predicate_uri: str,
473 order_property: str, snapshot: Optional[Graph] = None) -> Optional[int]:
474 """
475 Get the position of an entity in an ordered sequence.
477 Args:
478 entity_uri: URI of the entity to find position for
479 subject_uri: URI of the subject that has the ordered property
480 predicate_uri: URI of the ordered predicate
481 order_property: URI of the property that defines the ordering
482 snapshot: Optional graph snapshot for historical queries
484 Returns:
485 1-based position in the sequence, or None if not found
486 """
487 order_query = f"""
488 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue)
489 WHERE {{
490 <{subject_uri}> <{predicate_uri}> ?orderedEntity.
491 OPTIONAL {{
492 ?orderedEntity <{order_property}> ?next.
493 }}
494 }}
495 """
497 if snapshot:
498 order_results = list(snapshot.query(order_query))
500 order_map = {}
501 for res in order_results:
502 ordered_entity = str(res[0])
503 next_value = str(res[1])
504 order_map[ordered_entity] = None if next_value == "NONE" else next_value
506 position = _find_entity_position_in_order_map(entity_uri, order_map)
507 return position
508 else:
509 sparql = get_sparql()
510 sparql.setQuery(order_query)
511 sparql.setReturnFormat(JSON)
512 order_results = sparql.query().convert().get("results", {}).get("bindings", [])
514 order_map = {}
515 for res in order_results:
516 ordered_entity = res["orderedEntity"]["value"]
517 next_value = res["nextValue"]["value"]
518 order_map[ordered_entity] = None if next_value == "NONE" else next_value
520 return _find_entity_position_in_order_map(entity_uri, order_map)