Coverage for heritrace/utils/display_rules_utils.py: 96%
313 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-01 22:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-01 22:12 +0000
1from collections import OrderedDict
2from typing import Dict, List, Optional, Tuple, Union
3from urllib.parse import unquote
5from heritrace.extensions import (get_custom_filter, get_display_rules,
6 get_form_fields, get_sparql)
7from rdflib import ConjunctiveGraph, Graph, Literal, URIRef
8from rdflib.plugins.sparql.algebra import translateQuery
9from rdflib.plugins.sparql.parser import parseQuery
10from SPARQLWrapper import JSON
13display_rules = get_display_rules()
16def find_matching_rule(class_uri=None, shape_uri=None, rules=None):
17 """
18 Find the most appropriate rule for a given class and/or shape.
19 At least one of class_uri or shape_uri must be provided.
21 Args:
22 class_uri: Optional URI of the class
23 shape_uri: Optional URI of the shape
24 rules: Optional list of rules to search in, defaults to global display_rules
26 Returns:
27 The matching rule or None if no match is found
28 """
29 if not rules:
30 rules = get_display_rules()
31 if not rules:
32 return None
34 # Initialize variables to track potential matches
35 class_match = None
36 shape_match = None
37 highest_priority = float('inf')
39 # Scan all rules to find the best match based on priority
40 for rule in rules:
41 rule_priority = rule.get("priority", 0)
43 # Case 1: Both class and shape match (exact match)
44 if class_uri and shape_uri and \
45 "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \
46 "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri):
47 # Exact match always takes highest precedence
48 return rule
50 # Case 2: Only class matches
51 elif class_uri and "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \
52 "shape" not in rule["target"]:
53 if class_match is None or rule_priority < highest_priority:
54 class_match = rule
55 highest_priority = rule_priority
57 # Case 3: Only shape matches
58 elif shape_uri and "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri) and \
59 "class" not in rule["target"]:
60 if shape_match is None or rule_priority < highest_priority:
61 shape_match = rule
62 highest_priority = rule_priority
64 # Return the best match based on priority
65 # Shape rules typically have higher specificity, so prefer them if they have equal priority
66 if shape_match and (class_match is None or
67 shape_match.get("priority", 0) <= class_match.get("priority", 0)):
68 return shape_match
69 elif class_match:
70 return class_match
72 return None
75def get_class_priority(entity_key):
76 """
77 Returns the priority of a specific entity key (class_uri, shape_uri).
78 Calculates the priority directly from the display rules.
79 Classes without defined rules receive the lowest priority (highest number).
81 Args:
82 entity_key: A tuple (class_uri, shape_uri)
83 """
84 class_uri = entity_key[0]
85 shape_uri = entity_key[1]
87 rule = find_matching_rule(class_uri, shape_uri)
88 return rule.get("priority", 0) if rule else float('inf')
91def is_entity_type_visible(entity_key):
92 """
93 Determines if an entity type should be displayed.
95 Args:
96 entity_key: A tuple (class_uri, shape_uri)
97 """
98 class_uri = entity_key[0]
99 shape_uri = entity_key[1]
101 rule = find_matching_rule(class_uri, shape_uri)
102 return rule.get("shouldBeDisplayed", True) if rule else True
105def get_sortable_properties(entity_key: Tuple[str, str]) -> List[Dict[str, str]]:
106 """
107 Gets the sortable properties from display rules for an entity type and/or shape.
108 Infers the sorting type from form_fields_cache.
110 Args:
111 entity_key: A tuple (class_uri, shape_uri)
113 Returns:
114 List of dictionaries with sorting information
115 """
116 display_rules = get_display_rules()
117 if not display_rules:
118 return []
120 form_fields = get_form_fields()
122 class_uri = entity_key[0]
123 shape_uri = entity_key[1]
125 rule = find_matching_rule(class_uri, shape_uri, display_rules)
126 if not rule or "sortableBy" not in rule:
127 return []
129 sort_props = []
130 for sort_config in rule["sortableBy"]:
131 prop = sort_config.copy()
133 for display_prop in rule["displayProperties"]:
134 if display_prop["property"] == prop["property"]:
135 if "displayRules" in display_prop:
136 prop["displayName"] = display_prop["displayRules"][0][
137 "displayName"
138 ]
139 else:
140 prop["displayName"] = display_prop.get(
141 "displayName", prop["property"]
142 )
143 break
145 # Default to string sorting
146 prop["sortType"] = "string"
148 # Try to determine the sort type from form fields
149 if form_fields:
150 # First try with the exact entity_key (class, shape)
151 if entity_key in form_fields and prop["property"] in form_fields[entity_key]:
152 field_info = form_fields[entity_key][prop["property"]][0] # Take the first field definition
153 prop["sortType"] = determine_sort_type(field_info)
155 sort_props.append(prop)
157 return sort_props
160def determine_sort_type(field_info):
161 """Helper function to determine sort type from field info."""
162 # If there's a shape, it's a reference to an entity (sort by label)
163 if field_info.get("nodeShape"):
164 return "string"
165 # Otherwise look at the datatypes
166 elif field_info.get("datatypes"):
167 datatype = str(field_info["datatypes"][0]).lower()
168 if any(t in datatype for t in ["date", "time"]):
169 return "date"
170 elif any(
171 t in datatype
172 for t in ["int", "float", "decimal", "double", "number"]
173 ):
174 return "number"
175 elif "boolean" in datatype:
176 return "boolean"
177 # Default to string
178 return "string"
181def get_highest_priority_class(subject_classes):
182 """
183 Find the highest priority class from the given list of classes.
185 Args:
186 subject_classes: List of class URIs
188 Returns:
189 The highest priority class or None if no classes are provided
190 """
191 from heritrace.utils.shacl_utils import determine_shape_for_classes
193 if not subject_classes:
194 return None
196 highest_priority = float('inf')
197 highest_priority_class = None
199 for class_uri in subject_classes:
200 class_uri = str(class_uri)
201 shape = determine_shape_for_classes([class_uri])
202 entity_key = (class_uri, shape)
203 priority = get_class_priority(entity_key)
204 if priority < highest_priority:
205 highest_priority = priority
206 highest_priority_class = class_uri
208 if highest_priority_class is None and subject_classes:
209 highest_priority_class = str(subject_classes[0])
211 return highest_priority_class
214def get_grouped_triples(
215 subject: URIRef,
216 triples: List[Tuple[URIRef, URIRef, URIRef|Literal]],
217 valid_predicates_info: List[str],
218 historical_snapshot: Optional[Graph] = None,
219 highest_priority_class: Optional[str] = None,
220 highest_priority_shape: Optional[str] = None
221) -> Tuple[OrderedDict, set, dict]:
222 """
223 This function groups the triples based on the display rules.
224 It also fetches the values for the properties that are configured to be fetched from the query.
226 Args:
227 subject: The subject URI
228 triples: List of triples for the subject
229 valid_predicates_info: List of valid predicates for the subject
230 historical_snapshot: Optional historical snapshot graph
231 highest_priority_class: The highest priority class URI for the subject
232 highest_priority_shape: The highest priority shape URI for the subject
234 Returns:
235 Tuple of grouped triples, relevant properties, and fetched values map
237 Note:
238 relevant_properties contains all properties that should be considered
239 "relevant" for UI operations (adding/deleting). This includes:
240 - Properties configured in display rules when rules exist and match
241 - ALL valid properties when no display rules exist or no rules match
242 This ensures users can always interact with entities even without display rules.
243 """
244 display_rules = get_display_rules()
245 form_fields = get_form_fields()
247 grouped_triples = OrderedDict()
248 relevant_properties = set()
249 fetched_values_map = dict() # Map of original values to values returned by the query
250 primary_properties = valid_predicates_info
252 matching_rule = find_matching_rule(highest_priority_class, highest_priority_shape, display_rules)
253 matching_form_field = form_fields.get((highest_priority_class, highest_priority_shape))
254 ordered_properties = []
255 if display_rules and matching_rule:
256 for prop_config in matching_rule.get("displayProperties", []):
257 if prop_config["property"] not in ordered_properties:
258 ordered_properties.append(prop_config["property"])
260 for prop_uri in primary_properties:
261 if prop_uri not in ordered_properties:
262 ordered_properties.append(prop_uri)
264 for prop_uri in ordered_properties:
265 if display_rules and matching_rule:
266 current_prop_config = None
267 for prop_config in matching_rule.get("displayProperties", []):
268 if prop_config["property"] == prop_uri:
269 current_prop_config = prop_config
270 break
272 current_form_field = matching_form_field.get(prop_uri) if matching_form_field else None
274 if current_prop_config:
275 if "displayRules" in current_prop_config:
276 is_ordered = "orderedBy" in current_prop_config
277 order_property = current_prop_config.get("orderedBy")
279 for display_rule_nested in current_prop_config["displayRules"]:
280 display_name_nested = display_rule_nested.get(
281 "displayName", prop_uri
282 )
283 relevant_properties.add(prop_uri)
284 object_shape = display_rule_nested.get("shape")
285 process_display_rule(
286 display_name_nested,
287 prop_uri,
288 display_rule_nested,
289 subject,
290 triples,
291 grouped_triples,
292 fetched_values_map,
293 historical_snapshot,
294 highest_priority_shape,
295 object_shape,
296 highest_priority_class
297 )
298 if is_ordered:
299 grouped_triples[display_name_nested]["is_draggable"] = True
300 grouped_triples[display_name_nested]["ordered_by"] = order_property
301 process_ordering(
302 subject,
303 current_prop_config,
304 order_property,
305 grouped_triples,
306 display_name_nested,
307 fetched_values_map,
308 historical_snapshot,
309 )
311 # Ensure the grouped_triples entry exists
312 if display_name_nested not in grouped_triples:
313 grouped_triples[display_name_nested] = {
314 "property": prop_uri,
315 "triples": [],
316 "subjectClass": highest_priority_class,
317 "subjectShape": highest_priority_shape,
318 "objectShape": display_rule_nested.get("shape")
319 }
321 if "intermediateRelation" in display_rule_nested or "intermediateRelation" in current_prop_config:
322 # Set intermediateRelation from the appropriate source
323 if "intermediateRelation" in display_rule_nested:
324 grouped_triples[display_name_nested]["intermediateRelation"] = display_rule_nested["intermediateRelation"]
325 else: # Must be in current_prop_config based on the if condition
326 grouped_triples[display_name_nested]["intermediateRelation"] = current_prop_config["intermediateRelation"]
328 else:
329 display_name_simple = current_prop_config.get("displayName", prop_uri)
330 relevant_properties.add(prop_uri)
332 object_shape = None
333 if current_form_field:
334 for form_field in current_form_field:
335 object_shape = form_field.get("nodeShape")
336 break
338 process_display_rule(
339 display_name_simple,
340 prop_uri,
341 current_prop_config,
342 subject,
343 triples,
344 grouped_triples,
345 fetched_values_map,
346 historical_snapshot,
347 highest_priority_shape,
348 object_shape,
349 highest_priority_class
350 )
351 if "orderedBy" in current_prop_config:
352 if display_name_simple not in grouped_triples:
353 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")}
354 grouped_triples[display_name_simple]["is_draggable"] = True
355 grouped_triples[display_name_simple]["ordered_by"] = current_prop_config.get("orderedBy")
356 process_ordering(
357 subject,
358 current_prop_config,
359 current_prop_config.get("orderedBy"),
360 grouped_triples,
361 display_name_simple,
362 fetched_values_map,
363 historical_snapshot,
364 highest_priority_shape
365 )
366 if "intermediateRelation" in current_prop_config:
367 if display_name_simple not in grouped_triples:
368 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")}
369 grouped_triples[display_name_simple]["intermediateRelation"] = current_prop_config["intermediateRelation"]
370 else:
371 # Property without specific configuration - add to relevant_properties
372 relevant_properties.add(prop_uri)
373 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class)
374 else:
375 # No display rules or no matching rule - add all properties to relevant_properties
376 relevant_properties.add(prop_uri)
377 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class)
379 grouped_triples = OrderedDict(grouped_triples)
380 return grouped_triples, relevant_properties
383def process_display_rule(
384 display_name,
385 prop_uri,
386 rule,
387 subject,
388 triples,
389 grouped_triples,
390 fetched_values_map,
391 historical_snapshot=None,
392 subject_shape=None,
393 object_shape=None,
394 subject_class=None,
395):
396 if display_name not in grouped_triples:
397 grouped_triples[display_name] = {
398 "property": prop_uri,
399 "triples": [],
400 "subjectClass": subject_class,
401 "subjectShape": subject_shape,
402 "objectShape": object_shape,
403 "intermediateRelation": rule.get("intermediateRelation"),
404 }
405 for triple in triples:
406 if str(triple[1]) == prop_uri:
407 if rule.get("fetchValueFromQuery"):
408 if historical_snapshot:
409 result, external_entity = execute_historical_query(
410 rule["fetchValueFromQuery"],
411 subject,
412 triple[2],
413 historical_snapshot,
414 )
415 else:
416 result, external_entity = execute_sparql_query(
417 rule["fetchValueFromQuery"], subject, triple[2]
418 )
419 if result:
420 fetched_values_map[str(result)] = str(triple[2])
421 new_triple = (str(triple[0]), str(triple[1]), str(result))
422 object_uri = str(triple[2])
423 new_triple_data = {
424 "triple": new_triple,
425 "external_entity": external_entity,
426 "object": object_uri,
427 "subjectClass": subject_class,
428 "subjectShape": subject_shape,
429 "objectShape": object_shape,
430 }
431 grouped_triples[display_name]["triples"].append(new_triple_data)
432 else:
433 if str(triple[1]) == 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type':
434 from heritrace.utils.shacl_utils import determine_shape_for_classes
435 object_class_shape = determine_shape_for_classes([triple[2]])
436 result = get_custom_filter().human_readable_class((triple[2], object_class_shape))
437 else:
438 result = triple[2]
440 object_uri = str(triple[2])
442 new_triple_data = {
443 "triple": (str(triple[0]), str(triple[1]), result),
444 "object": object_uri,
445 "subjectClass": subject_class,
446 "subjectShape": subject_shape,
447 "objectShape": object_shape,
448 }
449 grouped_triples[display_name]["triples"].append(new_triple_data)
452def execute_sparql_query(query: str, subject: str, value: str) -> Tuple[str, str]:
453 sparql = get_sparql()
455 decoded_subject = unquote(subject)
456 decoded_value = unquote(value)
457 query = query.replace("[[subject]]", f"<{decoded_subject}>")
458 query = query.replace("[[value]]", f"<{decoded_value}>")
459 sparql.setQuery(query)
460 sparql.setReturnFormat(JSON)
461 results = sparql.query().convert().get("results", {}).get("bindings", [])
462 if results:
463 parsed_query = parseQuery(query)
464 algebra_query = translateQuery(parsed_query).algebra
465 variable_order = algebra_query["PV"]
466 result = results[0]
467 values = [
468 result.get(str(var_name), {}).get("value", None)
469 for var_name in variable_order
470 ]
471 first_value = values[0] if len(values) > 0 else None
472 second_value = values[1] if len(values) > 1 else None
473 return (first_value, second_value)
474 return None, None
477def process_ordering(
478 subject,
479 prop,
480 order_property,
481 grouped_triples,
482 display_name,
483 fetched_values_map,
484 historical_snapshot: ConjunctiveGraph | Graph | None = None,
485):
486 def get_ordered_sequence(order_results):
487 order_map = {}
488 for res in order_results:
489 if isinstance(res, dict): # For live triplestore results
490 ordered_entity = res["orderedEntity"]["value"]
491 next_value = res["nextValue"]["value"]
492 else: # For historical snapshot results
493 ordered_entity = str(res[0])
494 next_value = str(res[1])
496 order_map[str(ordered_entity)] = (
497 None if str(next_value) == "NONE" else str(next_value)
498 )
500 all_sequences = []
501 start_elements = set(order_map.keys()) - set(order_map.values())
502 while start_elements:
503 sequence = []
504 current_element = start_elements.pop()
505 while current_element in order_map:
506 sequence.append(current_element)
507 current_element = order_map[current_element]
508 all_sequences.append(sequence)
509 return all_sequences
511 decoded_subject = unquote(subject)
513 sparql = get_sparql()
515 order_query = f"""
516 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue)
517 WHERE {{
518 <{decoded_subject}> <{prop['property']}> ?orderedEntity.
519 OPTIONAL {{
520 ?orderedEntity <{order_property}> ?next.
521 }}
522 }}
523 """
524 if historical_snapshot:
525 order_results = list(historical_snapshot.query(order_query))
526 else:
527 sparql.setQuery(order_query)
528 sparql.setReturnFormat(JSON)
529 order_results = sparql.query().convert().get("results", {}).get("bindings", [])
531 order_sequences = get_ordered_sequence(order_results)
532 for sequence in order_sequences:
533 grouped_triples[display_name]["triples"].sort(
534 key=lambda x: (
535 sequence.index(
536 fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
537 )
538 if fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
539 in sequence
540 else float("inf")
541 )
542 )
545def process_default_property(prop_uri, triples, grouped_triples, subject_shape=None, subject_class=None):
546 display_name = prop_uri
547 grouped_triples[display_name] = {
548 "property": prop_uri,
549 "triples": [],
550 "subjectClass": subject_class,
551 "subjectShape": subject_shape,
552 "objectShape": None
553 }
554 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri]
555 for triple in triples_for_prop:
556 new_triple_data = {
557 "triple": (str(triple[0]), str(triple[1]), str(triple[2])),
558 "object": str(triple[2]),
559 "subjectClass": subject_class,
560 "subjectShape": subject_shape,
561 "objectShape": None,
562 }
563 grouped_triples[display_name]["triples"].append(new_triple_data)
566def execute_historical_query(
567 query: str, subject: str, value: str, historical_snapshot: Graph
568) -> Tuple[str, str]:
569 decoded_subject = unquote(subject)
570 decoded_value = unquote(value)
571 query = query.replace("[[subject]]", f"<{decoded_subject}>")
572 query = query.replace("[[value]]", f"<{decoded_value}>")
573 results = historical_snapshot.query(query)
574 if results:
575 for result in results:
576 return (str(result[0]), str(result[1]))
577 return None, None
580def get_property_order_from_rules(highest_priority_class: str, shape_uri: str = None):
581 """
582 Extract ordered list of properties from display rules for given entity class and optionally a shape.
584 Args:
585 highest_priority_class: The highest priority class for the entity
586 shape_uri: Optional shape URI for the entity
588 Returns:
589 List of property URIs in the order specified by display rules
590 """
591 if not highest_priority_class:
592 return []
594 rule = find_matching_rule(highest_priority_class, shape_uri)
595 if not rule:
596 return []
598 ordered_properties = []
599 for prop in rule.get("displayProperties", []):
600 if isinstance(prop, dict) and "property" in prop:
601 ordered_properties.append(prop["property"])
603 return ordered_properties
606def get_predicate_ordering_info(predicate_uri: str, highest_priority_class: str, entity_shape: str = None) -> Optional[str]:
607 """
608 Check if a predicate is ordered and return its ordering property.
610 Args:
611 predicate_uri: URI of the predicate to check
612 highest_priority_class: The highest priority class for the subject entity
613 entity_shape: Optional shape for the subject entity
615 Returns:
616 The ordering property URI if the predicate is ordered, None otherwise
617 """
618 display_rules = get_display_rules()
619 if not display_rules:
620 return None
622 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules)
623 if not rule:
624 return None
626 for prop in rule.get("displayProperties", []):
627 if isinstance(prop, dict) and prop.get("property") == predicate_uri:
628 return prop.get("orderedBy")
630 return None
633def get_shape_order_from_display_rules(highest_priority_class: str, entity_shape: str, predicate_uri: str) -> list:
634 """
635 Get the ordered list of shapes for a specific predicate from display rules.
637 Args:
638 highest_priority_class: The highest priority class for the entity
639 entity_shape: The shape for the subject entity
640 predicate_uri: The predicate URI to get shape ordering for
642 Returns:
643 List of shape URIs in the order specified in displayRules, or empty list if no rules found
644 """
645 display_rules = get_display_rules()
646 if not display_rules:
647 return []
649 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules)
650 if not rule or "displayProperties" not in rule:
651 return []
653 for prop_config in rule["displayProperties"]:
654 if prop_config["property"] == predicate_uri:
655 if "displayRules" in prop_config:
656 return [display_rule.get("shape") for display_rule in prop_config["displayRules"]
657 if display_rule.get("shape")]
659 return []
662def get_similarity_properties(entity_key: Tuple[str, str]) -> Optional[List[Union[str, Dict[str, List[str]]]]]:
663 """Gets the similarity properties configuration for a given entity key.
665 This configuration specifies which properties should be used for similarity matching
666 using a list-based structure supporting OR logic between elements and
667 nested AND logic within elements.
669 Example structures:
670 - ['prop1', 'prop2'] # prop1 OR prop2
671 - [{'and': ['prop3', 'prop4']}] # prop3 AND prop4
672 - ['prop1', {'and': ['prop2', 'prop3']}] # prop1 OR (prop2 AND prop3)
674 Args:
675 entity_key: A tuple (class_uri, shape_uri)
677 Returns:
678 A list where each element is either a property URI string or a dictionary
679 {'and': [list_of_property_uris]}, representing the boolean logic.
680 Returns None if no configuration is found or if the structure is invalid.
681 """
682 class_uri = entity_key[0]
683 shape_uri = entity_key[1]
685 # Find the matching rule
686 rule = find_matching_rule(class_uri, shape_uri)
687 if not rule:
688 return None
690 similarity_props = rule.get("similarity_properties")
692 if not similarity_props or not isinstance(similarity_props, list):
693 return None
695 # Validate each element in the list.
696 validated_props = []
697 for item in similarity_props:
698 if isinstance(item, str):
699 validated_props.append(item)
700 elif isinstance(item, dict) and len(item) == 1 and "and" in item:
701 and_list = item["and"]
702 if isinstance(and_list, list) and and_list and all(isinstance(p, str) for p in and_list):
703 validated_props.append(item)
704 else:
705 print(
706 f"Warning: Invalid 'and' group in similarity_properties" +
707 (f" for class {class_uri}" if class_uri else "") +
708 (f" with shape {shape_uri}" if shape_uri else "") +
709 f". Expected {{'and': ['prop_uri', ...]}} with a non-empty list of strings."
710 )
711 return None # Invalid 'and' group structure
712 else:
713 print(
714 f"Warning: Invalid item format in similarity_properties list" +
715 (f" for class {class_uri}" if class_uri else "") +
716 (f" with shape {shape_uri}" if shape_uri else "") +
717 f". Expected a property URI string or {{'and': [...]}} dict."
718 )
719 return None # Invalid item type
721 return validated_props if validated_props else None # Return validated list or None if empty after validation