Coverage for heritrace/utils/shacl_utils.py: 96%
206 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-10-13 17:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-10-13 17:12 +0000
1from typing import List, Optional, Tuple
3from flask import Flask
4from heritrace.extensions import get_shacl_graph, get_sparql
5from heritrace.utils.display_rules_utils import get_class_priority
6from heritrace.utils.shacl_display import (apply_display_rules,
7 extract_shacl_form_fields,
8 order_form_fields,
9 process_nested_shapes)
10from rdflib import RDF, Graph
11from SPARQLWrapper import JSON
14def get_form_fields_from_shacl(shacl: Graph, display_rules: List[dict], app: Flask):
15 """
16 Analyze SHACL shapes to extract form fields for each entity type.
18 Args:
19 shacl: The SHACL graph
20 display_rules: The display rules configuration
21 app: Flask application instance
23 Returns:
24 OrderedDict: A dictionary where the keys are tuples (class, shape) and the values are dictionaries
25 of form fields with their properties.
26 """
27 if not shacl:
28 return dict()
30 # Step 1: Get the initial form fields from SHACL shapes
31 form_fields = extract_shacl_form_fields(shacl, display_rules, app=app)
33 # Step 2: Process nested shapes for each field
34 processed_shapes = set()
35 for entity_key in form_fields:
36 for predicate in form_fields[entity_key]:
37 for field_info in form_fields[entity_key][predicate]:
38 if field_info.get("nodeShape"):
39 field_info["nestedShape"] = process_nested_shapes(
40 shacl,
41 display_rules,
42 field_info["nodeShape"],
43 app=app,
44 processed_shapes=processed_shapes,
45 )
47 # Step 3: Apply display rules to the form fields
48 if display_rules:
49 form_fields = apply_display_rules(shacl, form_fields, display_rules)
51 # Step 3.5: Ensure all form fields have displayName, using fallback for those without display rules
52 ensure_display_names(form_fields)
54 # Step 4: Add virtual properties to form_fields
55 enhanced_form_fields = add_virtual_properties_to_form_fields_internal(form_fields)
57 # Step 5: Order form fields (including virtual properties)
58 ordered_form_fields = order_form_fields(enhanced_form_fields, display_rules)
60 return ordered_form_fields
63def add_virtual_properties_to_form_fields_internal(form_fields: dict) -> dict:
64 """
65 Add virtual properties to form_fields during initial processing.
67 Args:
68 form_fields: The original form_fields dictionary
70 Returns:
71 Enhanced form_fields dictionary with virtual properties included
72 """
73 from heritrace.utils.virtual_properties import get_virtual_properties_for_entity
75 enhanced_form_fields = form_fields.copy() if form_fields else {}
77 for entity_key in enhanced_form_fields.keys():
78 entity_class, entity_shape = entity_key
80 virtual_properties = get_virtual_properties_for_entity(entity_class, entity_shape)
82 if virtual_properties:
83 for display_name, prop_config in virtual_properties:
84 should_be_displayed = prop_config.get("shouldBeDisplayed", True)
85 if not should_be_displayed:
86 continue
88 implementation = prop_config.get("implementedVia", {})
89 target = implementation.get("target", {})
90 intermediate_class = target.get("class")
91 specific_shape = target.get("shape")
93 if not specific_shape and intermediate_class:
94 specific_shape = determine_shape_for_classes([intermediate_class])
96 intermediate_entity_key = find_matching_form_field(
97 class_uri=intermediate_class,
98 shape_uri=specific_shape,
99 form_fields=enhanced_form_fields
100 )
102 nested_shape_list = []
103 if intermediate_entity_key:
104 nested_shape_data = enhanced_form_fields.get(intermediate_entity_key, {})
105 field_overrides = implementation.get("fieldOverrides", {})
107 for nested_prop_uri, nested_details_list in nested_shape_data.items():
108 for nested_details in nested_details_list:
109 nested_field = nested_details.copy()
111 if nested_prop_uri in field_overrides:
112 override = field_overrides[nested_prop_uri]
113 if "shouldBeDisplayed" in override:
114 nested_field["shouldBeDisplayed"] = override["shouldBeDisplayed"]
115 if "displayName" in override:
116 nested_field["displayName"] = override["displayName"]
117 if "value" in override:
118 nested_field["hasValue"] = override["value"]
119 nested_field["nestedShape"] = []
121 if nested_field.get('shouldBeDisplayed', True):
122 nested_shape_list.append(nested_field)
124 virtual_form_field = {
125 "displayName": prop_config.get("displayName", display_name),
126 "uri": display_name,
127 "is_virtual": True,
128 "min": 0,
129 "max": None,
130 "datatypes": [],
131 "optionalValues": [],
132 "orderedBy": None,
133 "nodeShape": None,
134 "subjectClass": None,
135 "subjectShape": None,
136 "objectClass": None,
137 "entityType": None,
138 "nestedShape": nested_shape_list,
139 "or": None
140 }
142 enhanced_form_fields[entity_key][display_name] = [virtual_form_field]
144 return enhanced_form_fields
147def determine_shape_for_classes(class_list: List[str]) -> Optional[str]:
148 """
149 Determine the most appropriate SHACL shape for a list of class URIs.
151 Args:
152 class_list: List of class URIs to find shapes for
154 Returns:
155 The most appropriate shape URI based on priority, or None if no shapes are found
156 """
157 shacl_graph = get_shacl_graph()
158 if not shacl_graph:
159 return None
161 all_shacl_shapes = []
163 for class_uri in class_list:
164 query_string = f"""
165 SELECT DISTINCT ?shape WHERE {{
166 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> .
167 }}
168 """
170 results = shacl_graph.query(query_string)
171 shapes = [str(row.shape) for row in results]
173 for shape in shapes:
174 all_shacl_shapes.append((class_uri, shape))
176 return _find_highest_priority_shape(all_shacl_shapes)
179def determine_shape_for_entity_triples(entity_triples: list) -> Optional[str]:
180 """
181 Determine the most appropriate SHACL shape for an entity based on its triples.
183 Uses a multi-criteria scoring system to distinguish between shapes:
184 1. sh:hasValue constraint matches (highest priority)
185 2. Property matching - number of shape properties present in entity
186 3. Class priority - predefined priority ordering
188 Args:
189 entity_triples: List of triples (subject, predicate, object) for the entity
191 Returns:
192 The most appropriate shape URI, or None if no shapes are found
193 """
194 shacl_graph = get_shacl_graph()
195 if not shacl_graph:
196 return None
198 entity_classes = []
199 entity_properties = set()
201 for subject, predicate, obj in entity_triples:
202 if str(predicate) == str(RDF.type):
203 entity_classes.append(str(obj))
204 entity_properties.add(str(predicate))
206 if not entity_classes:
207 return None
209 candidate_shapes = []
211 for class_uri in entity_classes:
212 query_string = f"""
213 SELECT DISTINCT ?shape WHERE {{
214 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> .
215 }}
216 """
218 results = shacl_graph.query(query_string)
219 shapes = [str(row.shape) for row in results]
221 for shape in shapes:
222 candidate_shapes.append((class_uri, shape))
224 if not candidate_shapes:
225 return None
227 if len(candidate_shapes) == 1:
228 return candidate_shapes[0][1]
230 shape_scores = {}
232 for class_uri, shape_uri in candidate_shapes:
233 shape_properties = _get_shape_properties(shacl_graph, shape_uri)
234 property_matches = len(entity_properties.intersection(shape_properties))
236 hasvalue_matches = _check_hasvalue_constraints(shacl_graph, shape_uri, entity_triples)
238 entity_key = (class_uri, shape_uri)
239 priority = get_class_priority(entity_key)
241 # Combined score: (hasvalue_matches, property_matches, -priority)
242 # hasValue matches are most important, then property matches, then priority
243 combined_score = (hasvalue_matches, property_matches, -priority)
244 shape_scores[shape_uri] = combined_score
246 best_shape = max(shape_scores.keys(), key=lambda s: shape_scores[s])
247 return best_shape
250def _find_highest_priority_shape(class_shape_pairs: List[Tuple[str, str]]) -> Optional[str]:
251 """
252 Helper function to find the shape with the highest priority from a list of (class_uri, shape) pairs.
254 Args:
255 class_shape_pairs: List of tuples (class_uri, shape)
257 Returns:
258 The shape with the highest priority, or None if the list is empty
259 """
260 highest_priority = float('inf')
261 highest_priority_shape = None
263 for class_uri, shape in class_shape_pairs:
264 entity_key = (class_uri, shape)
265 priority = get_class_priority(entity_key)
266 if priority < highest_priority:
267 highest_priority = priority
268 highest_priority_shape = shape
270 return highest_priority_shape
273def _get_shape_properties(shacl_graph: Graph, shape_uri: str) -> set:
274 """
275 Extract all properties defined in a SHACL shape.
277 Args:
278 shacl_graph: The SHACL graph
279 shape_uri: URI of the shape to analyze
281 Returns:
282 Set of property URIs defined in the shape
283 """
284 properties = set()
286 query_string = f"""
287 PREFIX sh: <http://www.w3.org/ns/shacl#>
288 SELECT DISTINCT ?property WHERE {{
289 <{shape_uri}> sh:property ?propertyShape .
290 ?propertyShape sh:path ?property .
291 }}
292 """
294 results = shacl_graph.query(query_string)
295 for row in results:
296 properties.add(str(row.property))
298 return properties
301def _check_hasvalue_constraints(shacl_graph: Graph, shape_uri: str, entity_triples: list) -> int:
302 """
303 Check how many sh:hasValue constraints the entity satisfies for a given shape.
305 Args:
306 shacl_graph: The SHACL graph
307 shape_uri: URI of the shape to check
308 entity_triples: List of triples (subject, predicate, object) for the entity
310 Returns:
311 Number of hasValue constraints satisfied by the entity
312 """
313 # Get all hasValue constraints for this shape
314 query_string = f"""
315 PREFIX sh: <http://www.w3.org/ns/shacl#>
316 SELECT DISTINCT ?property ?value WHERE {{
317 <{shape_uri}> sh:property ?propertyShape .
318 ?propertyShape sh:path ?property .
319 ?propertyShape sh:hasValue ?value .
320 }}
321 """
323 results = shacl_graph.query(query_string)
324 constraints = [(str(row.property), str(row.value)) for row in results]
326 if not constraints:
327 return 0
329 # Create a set of (predicate, object) pairs from entity triples
330 entity_property_values = set()
331 for _, predicate, obj in entity_triples:
332 entity_property_values.add((str(predicate), str(obj)))
334 # Count how many constraints are satisfied
335 satisfied_constraints = 0
336 for property_uri, required_value in constraints:
337 if (property_uri, required_value) in entity_property_values:
338 satisfied_constraints += 1
340 return satisfied_constraints
343def ensure_display_names(form_fields):
344 """
345 Ensures all form fields have a displayName, using URI formatting as fallback.
347 Args:
348 form_fields: Dictionary of form fields to process
349 """
350 from heritrace.utils.filters import format_uri_as_readable
352 for entity_key, predicates in form_fields.items():
353 for predicate_uri, details_list in predicates.items():
354 for field_info in details_list:
355 # Only add displayName if not already present
356 if not field_info.get("displayName"):
357 field_info["displayName"] = format_uri_as_readable(predicate_uri)
360def find_matching_form_field(class_uri=None, shape_uri=None, form_fields=None):
361 """
362 Find the most appropriate form field configuration for a given class and/or shape.
363 At least one of class_uri or shape_uri must be provided.
365 Args:
366 class_uri: Optional URI of the class
367 shape_uri: Optional URI of the shape
368 form_fields: Optional dictionary of form fields to search in, defaults to global form_fields
370 Returns:
371 The matching form field key (class_uri, shape_uri) or None if no match is found
372 """
373 if not form_fields:
374 from heritrace.extensions import get_form_fields
375 form_fields = get_form_fields()
377 if not form_fields:
378 return None
380 class_match = None
381 shape_match = None
383 for field_key in form_fields.keys():
384 field_class_uri = field_key[0]
385 field_shape_uri = field_key[1]
387 # Case 1: Both class and shape match (exact match)
388 if class_uri and shape_uri and \
389 field_class_uri == str(class_uri) and \
390 field_shape_uri == str(shape_uri):
391 return field_key
393 # Case 2: Only class matches (and form field has no shape constraint)
394 elif class_uri and field_class_uri == str(class_uri) and field_shape_uri is None:
395 class_match = field_key
397 # Case 3: Only shape matches (and form field has no class constraint)
398 elif shape_uri and field_shape_uri == str(shape_uri) and field_class_uri is None:
399 shape_match = field_key
401 # Case 4: Only class matches (even if form field has a shape)
402 elif class_uri and field_class_uri == str(class_uri) and not class_match:
403 class_match = field_key
405 # Return the best match based on specificity
406 # Shape rules typically have higher specificity, so prefer them
407 if shape_match:
408 return shape_match
409 elif class_match:
410 return class_match
412 return None
415def _find_entity_position_in_order_map(entity_uri: str, order_map: dict) -> Optional[int]:
416 """
417 Helper function to find entity position in an order map.
419 This function handles the case where there might be multiple independent ordered chains
420 within the same predicate relationship. Each chain has its own starting element and
421 follows a linked-list structure where each entity points to the next one.
423 Args:
424 entity_uri: URI of the entity to find position for
425 order_map: Dictionary mapping entities to their next entity in sequence.
426 Key = current entity URI, Value = next entity URI (or None for last element)
427 Example: {'entity1': 'entity2', 'entity2': 'entity3', 'entity3': None,
428 'entity4': 'entity5', 'entity5': None}
429 This represents two chains: [entity1 -> entity2 -> entity3] and [entity4 -> entity5]
431 Returns:
432 1-based position in the sequence, or None if not found
433 """
434 # Find all starting elements of ordered chains.
435 # A start element is one that appears as a key in the order_map but never as a value,
436 # meaning no other entity points to it (it's the head of a chain).
437 start_elements = set(order_map.keys()) - set(v for v in order_map.values() if v is not None)
439 if not start_elements:
440 # No valid starting points found - this shouldn't happen in well-formed data
441 return None
443 # Since there can be multiple independent ordered chains, we need to check each one
444 # to find which chain contains our target entity
445 for start_element in start_elements:
446 # Build the complete sequence for this chain by following the linked-list structure
447 sequence = []
448 current_element = start_element
450 # Follow the chain from start to end
451 while current_element in order_map:
452 sequence.append(current_element)
453 # Move to the next element in the chain (or None if we've reached the end)
454 current_element = order_map[current_element]
456 # Check if our target entity is in this particular chain
457 try:
458 # If found, return its 1-based position within this chain
459 return sequence.index(entity_uri) + 1 # Convert from 0-based to 1-based indexing
460 except ValueError:
461 # Entity not found in this chain, try the next one
462 continue
464 # Entity was not found in any of the ordered chains
465 return None
468def get_entity_position_in_sequence(entity_uri: str, subject_uri: str, predicate_uri: str,
469 order_property: str, snapshot: Optional[Graph] = None) -> Optional[int]:
470 """
471 Get the position of an entity in an ordered sequence.
473 Args:
474 entity_uri: URI of the entity to find position for
475 subject_uri: URI of the subject that has the ordered property
476 predicate_uri: URI of the ordered predicate
477 order_property: URI of the property that defines the ordering
478 snapshot: Optional graph snapshot for historical queries
480 Returns:
481 1-based position in the sequence, or None if not found
482 """
483 order_query = f"""
484 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue)
485 WHERE {{
486 <{subject_uri}> <{predicate_uri}> ?orderedEntity.
487 OPTIONAL {{
488 ?orderedEntity <{order_property}> ?next.
489 }}
490 }}
491 """
493 if snapshot:
494 order_results = list(snapshot.query(order_query))
496 order_map = {}
497 for res in order_results:
498 ordered_entity = str(res[0])
499 next_value = str(res[1])
500 order_map[ordered_entity] = None if next_value == "NONE" else next_value
502 position = _find_entity_position_in_order_map(entity_uri, order_map)
503 return position
504 else:
505 sparql = get_sparql()
506 sparql.setQuery(order_query)
507 sparql.setReturnFormat(JSON)
508 order_results = sparql.query().convert().get("results", {}).get("bindings", [])
510 order_map = {}
511 for res in order_results:
512 ordered_entity = res["orderedEntity"]["value"]
513 next_value = res["nextValue"]["value"]
514 order_map[ordered_entity] = None if next_value == "NONE" else next_value
516 return _find_entity_position_in_order_map(entity_uri, order_map)