Coverage for heritrace/utils/display_rules_utils.py: 100%
296 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-06-24 11:39 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-06-24 11:39 +0000
1from collections import OrderedDict
2from typing import Dict, List, Optional, Tuple, Union
3from urllib.parse import unquote
5from heritrace.extensions import (get_custom_filter, get_display_rules,
6 get_form_fields, get_sparql)
7from rdflib import ConjunctiveGraph, Graph, Literal, URIRef
8from rdflib.plugins.sparql.algebra import translateQuery
9from rdflib.plugins.sparql.parser import parseQuery
10from SPARQLWrapper import JSON
13display_rules = get_display_rules()
16def find_matching_rule(class_uri=None, shape_uri=None, rules=None):
17 """
18 Find the most appropriate rule for a given class and/or shape.
19 At least one of class_uri or shape_uri must be provided.
21 Args:
22 class_uri: Optional URI of the class
23 shape_uri: Optional URI of the shape
24 rules: Optional list of rules to search in, defaults to global display_rules
26 Returns:
27 The matching rule or None if no match is found
28 """
29 if not rules:
30 rules = get_display_rules()
31 if not rules:
32 return None
34 # Initialize variables to track potential matches
35 class_match = None
36 shape_match = None
37 highest_priority = float('inf')
39 # Scan all rules to find the best match based on priority
40 for rule in rules:
41 rule_priority = rule.get("priority", 0)
43 # Case 1: Both class and shape match (exact match)
44 if class_uri and shape_uri and \
45 "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \
46 "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri):
47 # Exact match always takes highest precedence
48 return rule
50 # Case 2: Only class matches
51 elif class_uri and "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \
52 "shape" not in rule["target"]:
53 if class_match is None or rule_priority < highest_priority:
54 class_match = rule
55 highest_priority = rule_priority
57 # Case 3: Only shape matches
58 elif shape_uri and "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri) and \
59 "class" not in rule["target"]:
60 if shape_match is None or rule_priority < highest_priority:
61 shape_match = rule
62 highest_priority = rule_priority
64 # Return the best match based on priority
65 # Shape rules typically have higher specificity, so prefer them if they have equal priority
66 if shape_match and (class_match is None or
67 shape_match.get("priority", 0) <= class_match.get("priority", 0)):
68 return shape_match
69 elif class_match:
70 return class_match
72 return None
75def get_class_priority(entity_key):
76 """
77 Returns the priority of a specific entity key (class_uri, shape_uri).
78 Calculates the priority directly from the display rules.
80 Args:
81 entity_key: A tuple (class_uri, shape_uri)
82 """
83 class_uri = entity_key[0]
84 shape_uri = entity_key[1]
86 rule = find_matching_rule(class_uri, shape_uri)
87 return rule.get("priority", 0) if rule else 0
90def is_entity_type_visible(entity_key):
91 """
92 Determines if an entity type should be displayed.
94 Args:
95 entity_key: A tuple (class_uri, shape_uri)
96 """
97 class_uri = entity_key[0]
98 shape_uri = entity_key[1]
100 rule = find_matching_rule(class_uri, shape_uri)
101 return rule.get("shouldBeDisplayed", True) if rule else True
104def get_sortable_properties(entity_key: Tuple[str, str]) -> List[Dict[str, str]]:
105 """
106 Gets the sortable properties from display rules for an entity type and/or shape.
107 Infers the sorting type from form_fields_cache.
109 Args:
110 entity_key: A tuple (class_uri, shape_uri)
112 Returns:
113 List of dictionaries with sorting information
114 """
115 display_rules = get_display_rules()
116 if not display_rules:
117 return []
119 form_fields = get_form_fields()
121 class_uri = entity_key[0]
122 shape_uri = entity_key[1]
124 rule = find_matching_rule(class_uri, shape_uri, display_rules)
125 if not rule or "sortableBy" not in rule:
126 return []
128 sort_props = []
129 for sort_config in rule["sortableBy"]:
130 prop = sort_config.copy()
132 for display_prop in rule["displayProperties"]:
133 if display_prop["property"] == prop["property"]:
134 if "displayRules" in display_prop:
135 prop["displayName"] = display_prop["displayRules"][0][
136 "displayName"
137 ]
138 else:
139 prop["displayName"] = display_prop.get(
140 "displayName", prop["property"]
141 )
142 break
144 # Default to string sorting
145 prop["sortType"] = "string"
147 # Try to determine the sort type from form fields
148 if form_fields:
149 # First try with the exact entity_key (class, shape)
150 if entity_key in form_fields and prop["property"] in form_fields[entity_key]:
151 field_info = form_fields[entity_key][prop["property"]][0] # Take the first field definition
152 prop["sortType"] = determine_sort_type(field_info)
154 sort_props.append(prop)
156 return sort_props
159def determine_sort_type(field_info):
160 """Helper function to determine sort type from field info."""
161 # If there's a shape, it's a reference to an entity (sort by label)
162 if field_info.get("nodeShape"):
163 return "string"
164 # Otherwise look at the datatypes
165 elif field_info.get("datatypes"):
166 datatype = str(field_info["datatypes"][0]).lower()
167 if any(t in datatype for t in ["date", "time"]):
168 return "date"
169 elif any(
170 t in datatype
171 for t in ["int", "float", "decimal", "double", "number"]
172 ):
173 return "number"
174 elif "boolean" in datatype:
175 return "boolean"
176 # Default to string
177 return "string"
180def get_highest_priority_class(subject_classes):
181 """
182 Find the highest priority class from the given list of classes.
184 Args:
185 subject_classes: List of class URIs
187 Returns:
188 The highest priority class or None if no classes are provided
189 """
190 from heritrace.utils.shacl_utils import determine_shape_for_classes
192 if not subject_classes:
193 return None
195 highest_priority = float('inf')
196 highest_priority_class = None
198 for class_uri in subject_classes:
199 class_uri = str(class_uri)
200 shape = determine_shape_for_classes([class_uri])
201 entity_key = (class_uri, shape)
202 priority = get_class_priority(entity_key)
203 if priority < highest_priority:
204 highest_priority = priority
205 highest_priority_class = class_uri
207 return highest_priority_class
210def get_grouped_triples(
211 subject: URIRef,
212 triples: List[Tuple[URIRef, URIRef, URIRef|Literal]],
213 valid_predicates_info: List[str],
214 historical_snapshot: Optional[Graph] = None,
215 highest_priority_class: Optional[str] = None,
216 highest_priority_shape: Optional[str] = None
217) -> Tuple[OrderedDict, set, dict]:
218 """
219 This function groups the triples based on the display rules.
220 It also fetches the values for the properties that are configured to be fetched from the query.
222 Args:
223 subject: The subject URI
224 triples: List of triples for the subject
225 valid_predicates_info: List of valid predicates for the subject
226 historical_snapshot: Optional historical snapshot graph
227 highest_priority_class: The highest priority class URI for the subject
229 Returns:
230 Tuple of grouped triples, relevant properties, and fetched values map
231 """
232 display_rules = get_display_rules()
233 form_fields = get_form_fields()
235 grouped_triples = OrderedDict()
236 relevant_properties = set()
237 fetched_values_map = dict() # Map of original values to values returned by the query
238 primary_properties = valid_predicates_info
240 matching_rule = find_matching_rule(highest_priority_class, highest_priority_shape, display_rules)
241 matching_form_field = form_fields.get((highest_priority_class, highest_priority_shape))
243 ordered_properties = []
244 if display_rules and matching_rule:
245 for prop_config in matching_rule.get("displayProperties", []):
246 if prop_config["property"] not in ordered_properties:
247 ordered_properties.append(prop_config["property"])
249 for prop_uri in primary_properties:
250 if prop_uri not in ordered_properties:
251 ordered_properties.append(prop_uri)
253 for prop_uri in ordered_properties:
254 if display_rules and matching_rule:
255 current_prop_config = None
256 for prop_config in matching_rule.get("displayProperties", []):
257 if prop_config["property"] == prop_uri:
258 current_prop_config = prop_config
259 break
261 current_form_field = matching_form_field.get(prop_uri) if matching_form_field else None
263 if current_prop_config:
264 if "displayRules" in current_prop_config:
265 is_ordered = "orderedBy" in current_prop_config
266 order_property = current_prop_config.get("orderedBy")
268 for display_rule_nested in current_prop_config["displayRules"]:
269 display_name_nested = display_rule_nested.get(
270 "displayName", prop_uri
271 )
272 relevant_properties.add(prop_uri)
273 object_shape = display_rule_nested.get("shape")
274 process_display_rule(
275 display_name_nested,
276 prop_uri,
277 display_rule_nested,
278 subject,
279 triples,
280 grouped_triples,
281 fetched_values_map,
282 historical_snapshot,
283 highest_priority_shape,
284 object_shape
285 )
286 if is_ordered:
287 grouped_triples[display_name_nested]["is_draggable"] = True
288 grouped_triples[display_name_nested]["ordered_by"] = order_property
289 process_ordering(
290 subject,
291 current_prop_config,
292 order_property,
293 grouped_triples,
294 display_name_nested,
295 fetched_values_map,
296 historical_snapshot,
297 )
299 # Ensure the grouped_triples entry exists
300 if display_name_nested not in grouped_triples:
301 grouped_triples[display_name_nested] = {
302 "property": prop_uri,
303 "triples": [],
304 "subjectShape": highest_priority_shape,
305 "objectShape": display_rule_nested.get("shape")
306 }
308 if "intermediateRelation" in display_rule_nested or "intermediateRelation" in current_prop_config:
309 # Set intermediateRelation from the appropriate source
310 if "intermediateRelation" in display_rule_nested:
311 grouped_triples[display_name_nested]["intermediateRelation"] = display_rule_nested["intermediateRelation"]
312 else: # Must be in current_prop_config based on the if condition
313 grouped_triples[display_name_nested]["intermediateRelation"] = current_prop_config["intermediateRelation"]
315 else:
316 display_name_simple = current_prop_config.get("displayName", prop_uri)
317 relevant_properties.add(prop_uri)
319 object_shape = None
320 if current_form_field:
321 for form_field in current_form_field:
322 object_shape = form_field.get("nodeShape")
323 break
325 process_display_rule(
326 display_name_simple,
327 prop_uri,
328 current_prop_config,
329 subject,
330 triples,
331 grouped_triples,
332 fetched_values_map,
333 historical_snapshot,
334 highest_priority_shape,
335 object_shape
336 )
337 if "orderedBy" in current_prop_config:
338 if display_name_simple not in grouped_triples:
339 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")}
340 grouped_triples[display_name_simple]["is_draggable"] = True
341 grouped_triples[display_name_simple]["ordered_by"] = current_prop_config.get("orderedBy")
342 process_ordering(
343 subject,
344 current_prop_config,
345 current_prop_config.get("orderedBy"),
346 grouped_triples,
347 display_name_simple,
348 fetched_values_map,
349 historical_snapshot,
350 highest_priority_shape
351 )
352 if "intermediateRelation" in current_prop_config:
353 if display_name_simple not in grouped_triples:
354 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")}
355 grouped_triples[display_name_simple]["intermediateRelation"] = current_prop_config["intermediateRelation"]
356 else:
357 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape)
358 else:
359 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape)
361 grouped_triples = OrderedDict(grouped_triples)
362 return grouped_triples, relevant_properties
365def process_display_rule(
366 display_name,
367 prop_uri,
368 rule,
369 subject,
370 triples,
371 grouped_triples,
372 fetched_values_map,
373 historical_snapshot=None,
374 subject_shape=None,
375 object_shape=None,
376):
377 if display_name not in grouped_triples:
378 grouped_triples[display_name] = {
379 "property": prop_uri,
380 "triples": [],
381 "subjectShape": subject_shape,
382 "objectShape": object_shape,
383 "intermediateRelation": rule.get("intermediateRelation"),
384 }
385 for triple in triples:
386 if str(triple[1]) == prop_uri:
387 if rule.get("fetchValueFromQuery"):
388 if historical_snapshot:
389 result, external_entity = execute_historical_query(
390 rule["fetchValueFromQuery"],
391 subject,
392 triple[2],
393 historical_snapshot,
394 )
395 else:
396 result, external_entity = execute_sparql_query(
397 rule["fetchValueFromQuery"], subject, triple[2]
398 )
399 if result:
400 fetched_values_map[str(result)] = str(triple[2])
401 new_triple = (str(triple[0]), str(triple[1]), str(result))
402 object_uri = str(triple[2])
403 new_triple_data = {
404 "triple": new_triple,
405 "external_entity": external_entity,
406 "object": object_uri,
407 "subjectShape": subject_shape,
408 "objectShape": object_shape,
409 }
410 grouped_triples[display_name]["triples"].append(new_triple_data)
411 else:
412 if str(triple[1]) == 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type':
413 from heritrace.utils.shacl_utils import determine_shape_for_classes
414 object_class_shape = determine_shape_for_classes([triple[2]])
415 result = get_custom_filter().human_readable_class((triple[2], object_class_shape))
416 else:
417 result = triple[2]
419 object_uri = str(triple[2])
421 new_triple_data = {
422 "triple": (str(triple[0]), str(triple[1]), result),
423 "object": object_uri,
424 "subjectShape": subject_shape,
425 "objectShape": object_shape,
426 }
427 grouped_triples[display_name]["triples"].append(new_triple_data)
430def execute_sparql_query(query: str, subject: str, value: str) -> Tuple[str, str]:
431 sparql = get_sparql()
433 decoded_subject = unquote(subject)
434 decoded_value = unquote(value)
435 query = query.replace("[[subject]]", f"<{decoded_subject}>")
436 query = query.replace("[[value]]", f"<{decoded_value}>")
437 sparql.setQuery(query)
438 sparql.setReturnFormat(JSON)
439 results = sparql.query().convert().get("results", {}).get("bindings", [])
440 if results:
441 parsed_query = parseQuery(query)
442 algebra_query = translateQuery(parsed_query).algebra
443 variable_order = algebra_query["PV"]
444 result = results[0]
445 values = [
446 result.get(str(var_name), {}).get("value", None)
447 for var_name in variable_order
448 ]
449 first_value = values[0] if len(values) > 0 else None
450 second_value = values[1] if len(values) > 1 else None
451 return (first_value, second_value)
452 return None, None
455def process_ordering(
456 subject,
457 prop,
458 order_property,
459 grouped_triples,
460 display_name,
461 fetched_values_map,
462 historical_snapshot: ConjunctiveGraph | Graph | None = None,
463):
464 def get_ordered_sequence(order_results):
465 order_map = {}
466 for res in order_results:
467 if isinstance(res, dict): # For live triplestore results
468 ordered_entity = res["orderedEntity"]["value"]
469 next_value = res["nextValue"]["value"]
470 else: # For historical snapshot results
471 ordered_entity = str(res[0])
472 next_value = str(res[1])
474 order_map[str(ordered_entity)] = (
475 None if str(next_value) == "NONE" else str(next_value)
476 )
478 all_sequences = []
479 start_elements = set(order_map.keys()) - set(order_map.values())
480 while start_elements:
481 sequence = []
482 current_element = start_elements.pop()
483 while current_element in order_map:
484 sequence.append(current_element)
485 current_element = order_map[current_element]
486 all_sequences.append(sequence)
487 return all_sequences
489 decoded_subject = unquote(subject)
491 sparql = get_sparql()
493 order_query = f"""
494 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue)
495 WHERE {{
496 <{decoded_subject}> <{prop['property']}> ?orderedEntity.
497 OPTIONAL {{
498 ?orderedEntity <{order_property}> ?next.
499 }}
500 }}
501 """
502 if historical_snapshot:
503 order_results = list(historical_snapshot.query(order_query))
504 else:
505 sparql.setQuery(order_query)
506 sparql.setReturnFormat(JSON)
507 order_results = sparql.query().convert().get("results", {}).get("bindings", [])
509 order_sequences = get_ordered_sequence(order_results)
510 for sequence in order_sequences:
511 grouped_triples[display_name]["triples"].sort(
512 key=lambda x: (
513 sequence.index(
514 fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
515 )
516 if fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
517 in sequence
518 else float("inf")
519 )
520 )
523def process_default_property(prop_uri, triples, grouped_triples, subject_shape=None):
524 display_name = prop_uri
525 grouped_triples[display_name] = {
526 "property": prop_uri,
527 "triples": [],
528 "subjectShape": subject_shape,
529 "objectShape": None
530 }
531 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri]
532 for triple in triples_for_prop:
533 new_triple_data = {
534 "triple": (str(triple[0]), str(triple[1]), str(triple[2])),
535 "object": str(triple[2]),
536 "subjectShape": subject_shape,
537 "objectShape": None,
538 }
539 grouped_triples[display_name]["triples"].append(new_triple_data)
542def execute_historical_query(
543 query: str, subject: str, value: str, historical_snapshot: Graph
544) -> Tuple[str, str]:
545 decoded_subject = unquote(subject)
546 decoded_value = unquote(value)
547 query = query.replace("[[subject]]", f"<{decoded_subject}>")
548 query = query.replace("[[value]]", f"<{decoded_value}>")
549 results = historical_snapshot.query(query)
550 if results:
551 for result in results:
552 return (str(result[0]), str(result[1]))
553 return None, None
556def get_property_order_from_rules(highest_priority_class: str, shape_uri: str = None):
557 """
558 Extract ordered list of properties from display rules for given entity class and optionally a shape.
560 Args:
561 highest_priority_class: The highest priority class for the entity
562 shape_uri: Optional shape URI for the entity
564 Returns:
565 List of property URIs in the order specified by display rules
566 """
567 display_rules = get_display_rules()
568 if not display_rules:
569 return []
571 ordered_properties = []
573 if not highest_priority_class:
574 return []
576 # If we have a shape, try to find a rule matching both class and shape
577 if shape_uri:
578 rule = find_matching_rule(highest_priority_class, shape_uri, display_rules)
579 if rule:
580 # Extract properties in order from displayProperties
581 for prop in rule.get("displayProperties", []):
582 if isinstance(prop, dict) and "property" in prop:
583 ordered_properties.append(prop["property"])
584 return ordered_properties
586 # If no match with shape or no shape provided, find a rule matching just the class
587 rule = find_matching_rule(highest_priority_class, None, display_rules)
588 if rule:
589 # Extract properties in order from displayProperties
590 for prop in rule.get("displayProperties", []):
591 if isinstance(prop, dict) and "property" in prop:
592 ordered_properties.append(prop["property"])
594 return ordered_properties
597def get_similarity_properties(entity_key: Tuple[str, str]) -> Optional[List[Union[str, Dict[str, List[str]]]]]:
598 """Gets the similarity properties configuration for a given entity key.
600 This configuration specifies which properties should be used for similarity matching
601 using a list-based structure supporting OR logic between elements and
602 nested AND logic within elements.
604 Example structures:
605 - ['prop1', 'prop2'] # prop1 OR prop2
606 - [{'and': ['prop3', 'prop4']}] # prop3 AND prop4
607 - ['prop1', {'and': ['prop2', 'prop3']}] # prop1 OR (prop2 AND prop3)
609 Args:
610 entity_key: A tuple (class_uri, shape_uri)
612 Returns:
613 A list where each element is either a property URI string or a dictionary
614 {'and': [list_of_property_uris]}, representing the boolean logic.
615 Returns None if no configuration is found or if the structure is invalid.
616 """
617 class_uri = entity_key[0]
618 shape_uri = entity_key[1]
620 # Find the matching rule
621 rule = find_matching_rule(class_uri, shape_uri)
622 if not rule:
623 return None
625 similarity_props = rule.get("similarity_properties")
627 if not similarity_props or not isinstance(similarity_props, list):
628 print(f"Warning: Invalid format for similarity_properties in class {class_uri}")
629 return None
631 # Validate each element in the list.
632 validated_props = []
633 for item in similarity_props:
634 if isinstance(item, str):
635 validated_props.append(item)
636 elif isinstance(item, dict) and len(item) == 1 and "and" in item:
637 and_list = item["and"]
638 if isinstance(and_list, list) and and_list and all(isinstance(p, str) for p in and_list):
639 validated_props.append(item)
640 else:
641 print(
642 f"Warning: Invalid 'and' group in similarity_properties" +
643 (f" for class {class_uri}" if class_uri else "") +
644 (f" with shape {shape_uri}" if shape_uri else "") +
645 f". Expected {{'and': ['prop_uri', ...]}} with a non-empty list of strings."
646 )
647 return None # Invalid 'and' group structure
648 else:
649 print(
650 f"Warning: Invalid item format in similarity_properties list" +
651 (f" for class {class_uri}" if class_uri else "") +
652 (f" with shape {shape_uri}" if shape_uri else "") +
653 f". Expected a property URI string or {{'and': [...]}} dict."
654 )
655 return None # Invalid item type
657 return validated_props if validated_props else None # Return validated list or None if empty after validation