Coverage for heritrace/utils/shacl_utils.py: 96%
164 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-01 22:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-01 22:12 +0000
1from typing import List, Optional, Tuple
3from flask import Flask
4from heritrace.extensions import get_shacl_graph, get_sparql
5from heritrace.utils.display_rules_utils import get_class_priority
6from heritrace.utils.shacl_display import (apply_display_rules,
7 extract_shacl_form_fields,
8 order_form_fields,
9 process_nested_shapes)
10from rdflib import RDF, Graph
11from SPARQLWrapper import JSON
14def get_form_fields_from_shacl(shacl: Graph, display_rules: List[dict], app: Flask):
15 """
16 Analyze SHACL shapes to extract form fields for each entity type.
18 Args:
19 shacl: The SHACL graph
20 display_rules: The display rules configuration
21 app: Flask application instance
23 Returns:
24 OrderedDict: A dictionary where the keys are tuples (class, shape) and the values are dictionaries
25 of form fields with their properties.
26 """
27 if not shacl:
28 return dict()
30 # Step 1: Get the initial form fields from SHACL shapes
31 form_fields = extract_shacl_form_fields(shacl, display_rules, app=app)
33 # Step 2: Process nested shapes for each field
34 processed_shapes = set()
35 for entity_key in form_fields:
36 for predicate in form_fields[entity_key]:
37 for field_info in form_fields[entity_key][predicate]:
38 if field_info.get("nodeShape"):
39 field_info["nestedShape"] = process_nested_shapes(
40 shacl,
41 display_rules,
42 field_info["nodeShape"],
43 app=app,
44 processed_shapes=processed_shapes,
45 )
47 # Step 3: Apply display rules to the form fields
48 if display_rules:
49 form_fields = apply_display_rules(shacl, form_fields, display_rules)
51 # Step 3.5: Ensure all form fields have displayName, using fallback for those without display rules
52 ensure_display_names(form_fields)
54 # Step 4: Order the form fields according to the display rules
55 ordered_form_fields = order_form_fields(form_fields, display_rules)
56 return ordered_form_fields
59def determine_shape_for_classes(class_list: List[str]) -> Optional[str]:
60 """
61 Determine the most appropriate SHACL shape for a list of class URIs.
63 Args:
64 class_list: List of class URIs to find shapes for
66 Returns:
67 The most appropriate shape URI based on priority, or None if no shapes are found
68 """
69 shacl_graph = get_shacl_graph()
70 if not shacl_graph:
71 return None
73 all_shacl_shapes = []
75 for class_uri in class_list:
76 query_string = f"""
77 SELECT DISTINCT ?shape WHERE {{
78 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> .
79 }}
80 """
82 results = shacl_graph.query(query_string)
83 shapes = [str(row.shape) for row in results]
85 for shape in shapes:
86 all_shacl_shapes.append((class_uri, shape))
88 return _find_highest_priority_shape(all_shacl_shapes)
91def determine_shape_for_entity_triples(entity_triples: list) -> Optional[str]:
92 """
93 Determine the most appropriate SHACL shape for an entity based on its triples.
95 Uses a multi-criteria scoring system to distinguish between shapes:
96 1. sh:hasValue constraint matches (highest priority)
97 2. Property matching - number of shape properties present in entity
98 3. Class priority - predefined priority ordering
100 Args:
101 entity_triples: List of triples (subject, predicate, object) for the entity
103 Returns:
104 The most appropriate shape URI, or None if no shapes are found
105 """
106 shacl_graph = get_shacl_graph()
107 if not shacl_graph:
108 return None
110 entity_classes = []
111 entity_properties = set()
113 for subject, predicate, obj in entity_triples:
114 if str(predicate) == str(RDF.type):
115 entity_classes.append(str(obj))
116 entity_properties.add(str(predicate))
118 if not entity_classes:
119 return None
121 candidate_shapes = []
123 for class_uri in entity_classes:
124 query_string = f"""
125 SELECT DISTINCT ?shape WHERE {{
126 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> .
127 }}
128 """
130 results = shacl_graph.query(query_string)
131 shapes = [str(row.shape) for row in results]
133 for shape in shapes:
134 candidate_shapes.append((class_uri, shape))
136 if not candidate_shapes:
137 return None
139 if len(candidate_shapes) == 1:
140 return candidate_shapes[0][1]
142 shape_scores = {}
144 for class_uri, shape_uri in candidate_shapes:
145 shape_properties = _get_shape_properties(shacl_graph, shape_uri)
146 property_matches = len(entity_properties.intersection(shape_properties))
148 hasvalue_matches = _check_hasvalue_constraints(shacl_graph, shape_uri, entity_triples)
150 entity_key = (class_uri, shape_uri)
151 priority = get_class_priority(entity_key)
153 # Combined score: (hasvalue_matches, property_matches, -priority)
154 # hasValue matches are most important, then property matches, then priority
155 combined_score = (hasvalue_matches, property_matches, -priority)
156 shape_scores[shape_uri] = combined_score
158 best_shape = max(shape_scores.keys(), key=lambda s: shape_scores[s])
159 return best_shape
162def _find_highest_priority_shape(class_shape_pairs: List[Tuple[str, str]]) -> Optional[str]:
163 """
164 Helper function to find the shape with the highest priority from a list of (class_uri, shape) pairs.
166 Args:
167 class_shape_pairs: List of tuples (class_uri, shape)
169 Returns:
170 The shape with the highest priority, or None if the list is empty
171 """
172 highest_priority = float('inf')
173 highest_priority_shape = None
175 for class_uri, shape in class_shape_pairs:
176 entity_key = (class_uri, shape)
177 priority = get_class_priority(entity_key)
178 if priority < highest_priority:
179 highest_priority = priority
180 highest_priority_shape = shape
182 return highest_priority_shape
185def _get_shape_properties(shacl_graph: Graph, shape_uri: str) -> set:
186 """
187 Extract all properties defined in a SHACL shape.
189 Args:
190 shacl_graph: The SHACL graph
191 shape_uri: URI of the shape to analyze
193 Returns:
194 Set of property URIs defined in the shape
195 """
196 properties = set()
198 query_string = f"""
199 PREFIX sh: <http://www.w3.org/ns/shacl#>
200 SELECT DISTINCT ?property WHERE {{
201 <{shape_uri}> sh:property ?propertyShape .
202 ?propertyShape sh:path ?property .
203 }}
204 """
206 results = shacl_graph.query(query_string)
207 for row in results:
208 properties.add(str(row.property))
210 return properties
213def _check_hasvalue_constraints(shacl_graph: Graph, shape_uri: str, entity_triples: list) -> int:
214 """
215 Check how many sh:hasValue constraints the entity satisfies for a given shape.
217 Args:
218 shacl_graph: The SHACL graph
219 shape_uri: URI of the shape to check
220 entity_triples: List of triples (subject, predicate, object) for the entity
222 Returns:
223 Number of hasValue constraints satisfied by the entity
224 """
225 # Get all hasValue constraints for this shape
226 query_string = f"""
227 PREFIX sh: <http://www.w3.org/ns/shacl#>
228 SELECT DISTINCT ?property ?value WHERE {{
229 <{shape_uri}> sh:property ?propertyShape .
230 ?propertyShape sh:path ?property .
231 ?propertyShape sh:hasValue ?value .
232 }}
233 """
235 results = shacl_graph.query(query_string)
236 constraints = [(str(row.property), str(row.value)) for row in results]
238 if not constraints:
239 return 0
241 # Create a set of (predicate, object) pairs from entity triples
242 entity_property_values = set()
243 for _, predicate, obj in entity_triples:
244 entity_property_values.add((str(predicate), str(obj)))
246 # Count how many constraints are satisfied
247 satisfied_constraints = 0
248 for property_uri, required_value in constraints:
249 if (property_uri, required_value) in entity_property_values:
250 satisfied_constraints += 1
252 return satisfied_constraints
255def ensure_display_names(form_fields):
256 """
257 Ensures all form fields have a displayName, using URI formatting as fallback.
259 Args:
260 form_fields: Dictionary of form fields to process
261 """
262 from heritrace.utils.filters import format_uri_as_readable
264 for entity_key, predicates in form_fields.items():
265 for predicate_uri, details_list in predicates.items():
266 for field_info in details_list:
267 # Only add displayName if not already present
268 if not field_info.get("displayName"):
269 field_info["displayName"] = format_uri_as_readable(predicate_uri)
272def find_matching_form_field(class_uri=None, shape_uri=None, form_fields=None):
273 """
274 Find the most appropriate form field configuration for a given class and/or shape.
275 At least one of class_uri or shape_uri must be provided.
277 Args:
278 class_uri: Optional URI of the class
279 shape_uri: Optional URI of the shape
280 form_fields: Optional dictionary of form fields to search in, defaults to global form_fields
282 Returns:
283 The matching form field key (class_uri, shape_uri) or None if no match is found
284 """
285 if not form_fields:
286 from heritrace.extensions import get_form_fields
287 form_fields = get_form_fields()
289 if not form_fields:
290 return None
292 class_match = None
293 shape_match = None
295 for field_key in form_fields.keys():
296 field_class_uri = field_key[0]
297 field_shape_uri = field_key[1]
299 # Case 1: Both class and shape match (exact match)
300 if class_uri and shape_uri and \
301 field_class_uri == str(class_uri) and \
302 field_shape_uri == str(shape_uri):
303 return field_key
305 # Case 2: Only class matches (and form field has no shape constraint)
306 elif class_uri and field_class_uri == str(class_uri) and field_shape_uri is None:
307 class_match = field_key
309 # Case 3: Only shape matches (and form field has no class constraint)
310 elif shape_uri and field_shape_uri == str(shape_uri) and field_class_uri is None:
311 shape_match = field_key
313 # Return the best match based on specificity
314 # Shape rules typically have higher specificity, so prefer them
315 if shape_match:
316 return shape_match
317 elif class_match:
318 return class_match
320 return None
323def _find_entity_position_in_order_map(entity_uri: str, order_map: dict) -> Optional[int]:
324 """
325 Helper function to find entity position in an order map.
327 This function handles the case where there might be multiple independent ordered chains
328 within the same predicate relationship. Each chain has its own starting element and
329 follows a linked-list structure where each entity points to the next one.
331 Args:
332 entity_uri: URI of the entity to find position for
333 order_map: Dictionary mapping entities to their next entity in sequence.
334 Key = current entity URI, Value = next entity URI (or None for last element)
335 Example: {'entity1': 'entity2', 'entity2': 'entity3', 'entity3': None,
336 'entity4': 'entity5', 'entity5': None}
337 This represents two chains: [entity1 -> entity2 -> entity3] and [entity4 -> entity5]
339 Returns:
340 1-based position in the sequence, or None if not found
341 """
342 # Find all starting elements of ordered chains.
343 # A start element is one that appears as a key in the order_map but never as a value,
344 # meaning no other entity points to it (it's the head of a chain).
345 start_elements = set(order_map.keys()) - set(v for v in order_map.values() if v is not None)
347 if not start_elements:
348 # No valid starting points found - this shouldn't happen in well-formed data
349 return None
351 # Since there can be multiple independent ordered chains, we need to check each one
352 # to find which chain contains our target entity
353 for start_element in start_elements:
354 # Build the complete sequence for this chain by following the linked-list structure
355 sequence = []
356 current_element = start_element
358 # Follow the chain from start to end
359 while current_element in order_map:
360 sequence.append(current_element)
361 # Move to the next element in the chain (or None if we've reached the end)
362 current_element = order_map[current_element]
364 # Check if our target entity is in this particular chain
365 try:
366 # If found, return its 1-based position within this chain
367 return sequence.index(entity_uri) + 1 # Convert from 0-based to 1-based indexing
368 except ValueError:
369 # Entity not found in this chain, try the next one
370 continue
372 # Entity was not found in any of the ordered chains
373 return None
376def get_entity_position_in_sequence(entity_uri: str, subject_uri: str, predicate_uri: str,
377 order_property: str, snapshot: Optional[Graph] = None) -> Optional[int]:
378 """
379 Get the position of an entity in an ordered sequence.
381 Args:
382 entity_uri: URI of the entity to find position for
383 subject_uri: URI of the subject that has the ordered property
384 predicate_uri: URI of the ordered predicate
385 order_property: URI of the property that defines the ordering
386 snapshot: Optional graph snapshot for historical queries
388 Returns:
389 1-based position in the sequence, or None if not found
390 """
391 order_query = f"""
392 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue)
393 WHERE {{
394 <{subject_uri}> <{predicate_uri}> ?orderedEntity.
395 OPTIONAL {{
396 ?orderedEntity <{order_property}> ?next.
397 }}
398 }}
399 """
401 if snapshot:
402 order_results = list(snapshot.query(order_query))
404 order_map = {}
405 for res in order_results:
406 ordered_entity = str(res[0])
407 next_value = str(res[1])
408 order_map[ordered_entity] = None if next_value == "NONE" else next_value
410 position = _find_entity_position_in_order_map(entity_uri, order_map)
411 return position
412 else:
413 sparql = get_sparql()
414 sparql.setQuery(order_query)
415 sparql.setReturnFormat(JSON)
416 order_results = sparql.query().convert().get("results", {}).get("bindings", [])
418 order_map = {}
419 for res in order_results:
420 ordered_entity = res["orderedEntity"]["value"]
421 next_value = res["nextValue"]["value"]
422 order_map[ordered_entity] = None if next_value == "NONE" else next_value
424 return _find_entity_position_in_order_map(entity_uri, order_map)