Coverage for heritrace / utils / display_rules_utils.py: 94%
398 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import logging
8from collections import OrderedDict
9from dataclasses import dataclass
10from typing import TYPE_CHECKING
11from urllib.parse import unquote
13from rdflib import Graph, Literal, URIRef
14from rdflib.plugins.sparql.algebra import translateQuery
15from rdflib.plugins.sparql.parser import parseQuery
16from SPARQLWrapper import JSON
18from heritrace.extensions import (
19 get_custom_filter,
20 get_display_rules,
21 get_form_fields,
22 get_sparql,
23 get_sparql_bindings,
24 select_results,
25)
27if TYPE_CHECKING:
28 from rdflib.query import ResultRow
31@dataclass(slots=True)
32class GroupingContext:
33 subject: URIRef
34 triples: list[tuple[URIRef, URIRef, URIRef | Literal]]
35 grouped_triples: OrderedDict
36 fetched_values_map: dict[str, str]
37 relevant_properties: set
38 historical_snapshot: Graph | None
39 highest_priority_class: str | None
40 highest_priority_shape: str | None
43_SUBJECT_LABEL_PAIR_LENGTH = 2
46def find_matching_rule(
47 class_uri: str | None = None,
48 shape_uri: str | None = None,
49 rules: list[dict] | None = None,
50) -> dict | None:
51 """
52 Find the most appropriate rule for a given class and/or shape.
53 At least one of class_uri or shape_uri must be provided.
55 Args:
56 class_uri: Optional URI of the class
57 shape_uri: Optional URI of the shape
58 rules: Optional list of rules to search in, defaults to global display_rules
60 Returns:
61 The matching rule or None if no match is found
62 """
63 if not rules:
64 rules = get_display_rules()
65 if not rules:
66 return None
68 # Initialize variables to track potential matches
69 class_match = None
70 shape_match = None
71 highest_priority = float("inf")
73 # Scan all rules to find the best match based on priority
74 for rule in rules:
75 rule_priority = rule.get("priority", 0)
77 # Case 1: Both class and shape match (exact match)
78 if (
79 class_uri
80 and shape_uri
81 and "class" in rule["target"]
82 and rule["target"]["class"] == str(class_uri)
83 and "shape" in rule["target"]
84 and rule["target"]["shape"] == str(shape_uri)
85 ):
86 # Exact match always takes highest precedence
87 return rule
89 # Case 2: Only class matches
90 if (
91 class_uri
92 and "class" in rule["target"]
93 and rule["target"]["class"] == str(class_uri)
94 and "shape" not in rule["target"]
95 ):
96 if class_match is None or rule_priority < highest_priority:
97 class_match = rule
98 highest_priority = rule_priority
100 # Case 3: Only shape matches
101 elif (
102 shape_uri
103 and "shape" in rule["target"]
104 and rule["target"]["shape"] == str(shape_uri)
105 and "class" not in rule["target"]
106 ) and (shape_match is None or rule_priority < highest_priority):
107 shape_match = rule
108 highest_priority = rule_priority
110 # Return the best match based on priority
111 # Shape rules typically have higher specificity,
112 # so prefer them if they have equal priority
113 if shape_match and (
114 class_match is None
115 or shape_match.get("priority", 0) <= class_match.get("priority", 0)
116 ):
117 return shape_match
118 if class_match:
119 return class_match
121 return None
124def get_class_priority(entity_key: tuple[str, str | None]) -> float:
125 """
126 Returns the priority of a specific entity key (class_uri, shape_uri).
127 Calculates the priority directly from the display rules.
128 Classes without defined rules receive the lowest priority (highest number).
130 Args:
131 entity_key: A tuple (class_uri, shape_uri)
132 """
133 class_uri = entity_key[0]
134 shape_uri = entity_key[1]
136 rule = find_matching_rule(class_uri, shape_uri)
137 return rule.get("priority", 0) if rule else float("inf")
140def is_entity_type_visible(entity_key: tuple[str, str | None]) -> bool:
141 """
142 Determines if an entity type should be displayed.
144 Args:
145 entity_key: A tuple (class_uri, shape_uri)
146 """
147 class_uri = entity_key[0]
148 shape_uri = entity_key[1]
150 rule = find_matching_rule(class_uri, shape_uri)
151 return rule.get("shouldBeDisplayed", True) if rule else True
154def get_sortable_properties(entity_key: tuple[str, str | None]) -> list[dict[str, str]]:
155 """
156 Gets the sortable properties from display rules for an entity type and/or shape.
157 Infers the sorting type from form_fields_cache.
159 Args:
160 entity_key: A tuple (class_uri, shape_uri)
162 Returns:
163 List of dictionaries with sorting information
164 """
165 display_rules = get_display_rules()
166 if not display_rules:
167 return []
169 form_fields = get_form_fields()
171 class_uri = entity_key[0]
172 shape_uri = entity_key[1]
174 rule = find_matching_rule(class_uri, shape_uri, display_rules)
175 if not rule or "sortableBy" not in rule:
176 return []
178 sort_props = []
179 for sort_config in rule["sortableBy"]:
180 prop = sort_config.copy()
182 for display_prop in rule["displayProperties"]:
183 if display_prop["property"] == prop["property"]:
184 if "displayRules" in display_prop:
185 prop["displayName"] = display_prop["displayRules"][0]["displayName"]
186 else:
187 prop["displayName"] = display_prop.get(
188 "displayName", prop["property"]
189 )
190 break
192 # Default to string sorting
193 prop["sortType"] = "string"
195 # Try to determine the sort type from form fields
196 if form_fields and (
197 entity_key in form_fields and prop["property"] in form_fields[entity_key]
198 ):
199 field_info = form_fields[entity_key][prop["property"]][
200 0
201 ] # Take the first field definition
202 prop["sortType"] = determine_sort_type(field_info)
204 sort_props.append(prop)
206 return sort_props
209def determine_sort_type(field_info: dict) -> str:
210 """Helper function to determine sort type from field info."""
211 # If there's a shape, it's a reference to an entity (sort by label)
212 if field_info.get("nodeShape"):
213 return "string"
214 # Otherwise look at the datatypes
215 if field_info.get("datatypes"):
216 datatype = str(field_info["datatypes"][0]).lower()
217 if any(t in datatype for t in ["date", "time"]):
218 return "date"
219 if any(t in datatype for t in ["int", "float", "decimal", "double", "number"]):
220 return "number"
221 if "boolean" in datatype:
222 return "boolean"
223 # Default to string
224 return "string"
227def get_highest_priority_class(subject_classes: list[str]) -> str | None:
228 """
229 Find the highest priority class from the given list of classes.
231 Args:
232 subject_classes: List of class URIs
234 Returns:
235 The highest priority class or None if no classes are provided
236 """
237 from heritrace.utils.shacl_utils import determine_shape_for_classes # noqa: PLC0415
239 if not subject_classes:
240 return None
242 highest_priority = float("inf")
243 highest_priority_class = None
245 for raw_class_uri in subject_classes:
246 class_uri = str(raw_class_uri)
247 shape = determine_shape_for_classes([class_uri])
248 entity_key = (class_uri, shape)
249 priority = get_class_priority(entity_key)
250 if priority < highest_priority:
251 highest_priority = priority
252 highest_priority_class = class_uri
254 if highest_priority_class is None and subject_classes:
255 highest_priority_class = str(subject_classes[0])
257 return highest_priority_class
260def _ensure_grouped_entry(
261 ctx: GroupingContext,
262 display_name: str,
263 prop_uri: str,
264 object_shape: str | None,
265) -> None:
266 if display_name not in ctx.grouped_triples:
267 ctx.grouped_triples[display_name] = {
268 "property": prop_uri,
269 "triples": [],
270 "subjectClass": ctx.highest_priority_class,
271 "subjectShape": ctx.highest_priority_shape,
272 "objectShape": object_shape,
273 }
276def _apply_ordering_to_group(
277 ctx: GroupingContext,
278 current_prop_config: dict,
279 order_property: str | None,
280 display_name: str,
281) -> None:
282 ctx.grouped_triples[display_name]["is_draggable"] = True
283 ctx.grouped_triples[display_name]["ordered_by"] = order_property
284 process_ordering(
285 ctx,
286 current_prop_config,
287 order_property,
288 display_name,
289 )
292def _apply_intermediate_relation(
293 grouped_triples: OrderedDict,
294 display_name: str,
295 *sources: dict,
296) -> None:
297 for source in sources:
298 if "intermediateRelation" in source:
299 grouped_triples[display_name]["intermediateRelation"] = source[
300 "intermediateRelation"
301 ]
302 return
305def _process_property_with_nested_display_rules(
306 prop_uri: str,
307 current_prop_config: dict,
308 ctx: GroupingContext,
309) -> None:
310 is_ordered = "orderedBy" in current_prop_config
311 order_property = current_prop_config.get("orderedBy")
313 for display_rule_nested in current_prop_config["displayRules"]:
314 display_name_nested = display_rule_nested.get("displayName", prop_uri)
315 ctx.relevant_properties.add(prop_uri)
316 object_shape = display_rule_nested.get("shape")
317 if current_prop_config.get("isVirtual"):
318 process_virtual_property_display(
319 display_name_nested,
320 current_prop_config,
321 ctx,
322 )
323 else:
324 process_display_rule(
325 display_name_nested,
326 prop_uri,
327 display_rule_nested,
328 ctx,
329 object_shape=object_shape,
330 )
331 if is_ordered and not current_prop_config.get("isVirtual", False):
332 _apply_ordering_to_group(
333 ctx,
334 current_prop_config,
335 order_property,
336 display_name_nested,
337 )
339 _ensure_grouped_entry(
340 ctx,
341 display_name_nested,
342 prop_uri,
343 display_rule_nested.get("shape"),
344 )
346 _apply_intermediate_relation(
347 ctx.grouped_triples,
348 display_name_nested,
349 display_rule_nested,
350 current_prop_config,
351 )
354def _process_property_with_simple_config(
355 prop_uri: str,
356 current_prop_config: dict,
357 current_form_field: list[dict] | None,
358 ctx: GroupingContext,
359) -> None:
360 display_name_simple = current_prop_config.get("displayName", prop_uri)
361 # Only add non-virtual properties to relevant_properties
362 # Virtual properties are handled separately in entity.py
363 if not current_prop_config.get("isVirtual"):
364 ctx.relevant_properties.add(prop_uri)
366 object_shape = None
367 if current_form_field:
368 for form_field in current_form_field:
369 object_shape = form_field.get("nodeShape")
370 break
372 if current_prop_config.get("isVirtual"):
373 process_virtual_property_display(
374 display_name_simple,
375 current_prop_config,
376 ctx,
377 )
378 else:
379 process_display_rule(
380 display_name_simple,
381 prop_uri,
382 current_prop_config,
383 ctx,
384 object_shape=object_shape,
385 )
386 if "orderedBy" in current_prop_config and not current_prop_config.get(
387 "isVirtual", False
388 ):
389 _ensure_grouped_entry(
390 ctx,
391 display_name_simple,
392 prop_uri,
393 current_prop_config.get("shape"),
394 )
395 _apply_ordering_to_group(
396 ctx,
397 current_prop_config,
398 current_prop_config.get("orderedBy"),
399 display_name_simple,
400 )
401 if "intermediateRelation" in current_prop_config:
402 _ensure_grouped_entry(
403 ctx,
404 display_name_simple,
405 prop_uri,
406 current_prop_config.get("shape"),
407 )
408 ctx.grouped_triples[display_name_simple]["intermediateRelation"] = (
409 current_prop_config["intermediateRelation"]
410 )
413def _process_property_with_display_rules(
414 prop_uri: str,
415 matching_rule: dict,
416 matching_form_field: dict | None,
417 ctx: GroupingContext,
418) -> None:
419 current_prop_config = None
420 for prop_config in matching_rule.get("displayProperties", []):
421 config_identifier = (
422 prop_config.get("displayName")
423 if prop_config.get("isVirtual")
424 else prop_config.get("property")
425 )
426 if config_identifier == prop_uri:
427 current_prop_config = prop_config
428 break
430 current_form_field = (
431 matching_form_field.get(prop_uri) if matching_form_field else None
432 )
434 if current_prop_config:
435 if "displayRules" in current_prop_config:
436 _process_property_with_nested_display_rules(
437 prop_uri,
438 current_prop_config,
439 ctx,
440 )
441 else:
442 _process_property_with_simple_config(
443 prop_uri,
444 current_prop_config,
445 current_form_field,
446 ctx,
447 )
448 else:
449 # Property without specific configuration - add to relevant_properties
450 # Don't process properties without config
451 # (they are not virtual in this case)
452 ctx.relevant_properties.add(prop_uri)
453 process_default_property(
454 prop_uri,
455 ctx.triples,
456 ctx.grouped_triples,
457 ctx.highest_priority_shape,
458 ctx.highest_priority_class,
459 )
462def get_grouped_triples(
463 subject: URIRef,
464 triples: list[tuple[URIRef, URIRef, URIRef | Literal]],
465 valid_predicates_info: list[str],
466 historical_snapshot: Graph | None = None,
467 entity_key: tuple[str | None, str | None] = (None, None),
468) -> tuple[OrderedDict, set]:
469 highest_priority_class, highest_priority_shape = entity_key
470 display_rules = get_display_rules()
471 form_fields = get_form_fields()
473 grouped_triples = OrderedDict()
474 relevant_properties: set = set()
475 fetched_values_map: dict[str, str] = {}
477 ctx = GroupingContext(
478 subject=subject,
479 triples=triples,
480 grouped_triples=grouped_triples,
481 fetched_values_map=fetched_values_map,
482 relevant_properties=relevant_properties,
483 historical_snapshot=historical_snapshot,
484 highest_priority_class=highest_priority_class,
485 highest_priority_shape=highest_priority_shape,
486 )
488 matching_rule = find_matching_rule(
489 highest_priority_class, highest_priority_shape, display_rules
490 )
491 matching_form_field = form_fields.get(
492 (highest_priority_class, highest_priority_shape)
493 )
495 ordered_properties = []
496 if display_rules and matching_rule:
497 for prop_config in matching_rule.get("displayProperties", []):
498 if prop_config.get("isVirtual"):
499 prop_uri = prop_config.get("displayName")
500 else:
501 prop_uri = prop_config.get("property")
502 if prop_uri and prop_uri not in ordered_properties:
503 ordered_properties.append(prop_uri)
505 for prop_uri in valid_predicates_info:
506 if prop_uri not in ordered_properties:
507 ordered_properties.append(prop_uri)
509 for prop_uri in ordered_properties:
510 if display_rules and matching_rule:
511 _process_property_with_display_rules(
512 prop_uri,
513 matching_rule,
514 matching_form_field,
515 ctx,
516 )
517 else:
518 # No display rules or no matching rule -
519 # add all properties to relevant_properties
520 ctx.relevant_properties.add(prop_uri)
521 process_default_property(
522 prop_uri,
523 ctx.triples,
524 ctx.grouped_triples,
525 ctx.highest_priority_shape,
526 ctx.highest_priority_class,
527 )
529 ctx.grouped_triples = OrderedDict(ctx.grouped_triples)
530 return ctx.grouped_triples, ctx.relevant_properties
533def process_display_rule(
534 display_name: str,
535 prop_uri: str,
536 rule: dict,
537 ctx: GroupingContext,
538 object_shape: str | None = None,
539) -> None:
540 if display_name not in ctx.grouped_triples:
541 ctx.grouped_triples[display_name] = {
542 "property": prop_uri,
543 "triples": [],
544 "subjectClass": ctx.highest_priority_class,
545 "subjectShape": ctx.highest_priority_shape,
546 "objectShape": object_shape,
547 "intermediateRelation": rule.get("intermediateRelation"),
548 }
549 for triple in ctx.triples:
550 if str(triple[1]) == prop_uri:
551 if rule.get("fetchValueFromQuery"):
552 if ctx.historical_snapshot:
553 result, external_entity = execute_historical_query(
554 rule["fetchValueFromQuery"],
555 ctx.subject,
556 triple[2],
557 ctx.historical_snapshot,
558 )
559 else:
560 result, external_entity = execute_sparql_query(
561 rule["fetchValueFromQuery"], ctx.subject, triple[2]
562 )
563 if result:
564 ctx.fetched_values_map[str(result)] = str(triple[2])
565 new_triple = (str(triple[0]), str(triple[1]), str(result))
566 object_uri = str(triple[2])
567 new_triple_data = {
568 "triple": new_triple,
569 "external_entity": external_entity,
570 "object": object_uri,
571 "subjectClass": ctx.highest_priority_class,
572 "subjectShape": ctx.highest_priority_shape,
573 "objectShape": object_shape,
574 }
575 ctx.grouped_triples[display_name]["triples"].append(new_triple_data)
576 else:
577 if str(triple[1]) == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
578 from heritrace.utils.shacl_utils import ( # noqa: PLC0415
579 determine_shape_for_classes,
580 )
582 object_class_shape = determine_shape_for_classes([triple[2]])
583 result = get_custom_filter().human_readable_class(
584 (triple[2], object_class_shape)
585 )
586 else:
587 result = triple[2]
589 object_uri = str(triple[2])
591 new_triple_data = {
592 "triple": (str(triple[0]), str(triple[1]), result),
593 "object": object_uri,
594 "subjectClass": ctx.highest_priority_class,
595 "subjectShape": ctx.highest_priority_shape,
596 "objectShape": object_shape,
597 }
598 ctx.grouped_triples[display_name]["triples"].append(new_triple_data)
601def _fetch_virtual_property_entities(
602 reference_field: str,
603 target_class: str | None,
604 ctx: GroupingContext,
605) -> list[str]:
606 decoded_subject = unquote(str(ctx.subject))
608 query = f"""
609 SELECT DISTINCT ?entity
610 WHERE {{
611 ?entity <{reference_field}> <{decoded_subject}> .
612 """
614 if target_class:
615 query += f"""
616 ?entity a <{target_class}> .
617 """
619 query += """
620 }
621 """
623 if ctx.historical_snapshot:
624 return [
625 str(row[0]) for row in select_results(ctx.historical_snapshot.query(query))
626 ]
628 sparql = get_sparql()
629 sparql.setQuery(query)
630 sparql.setReturnFormat(JSON)
631 bindings = get_sparql_bindings(sparql.query().convert())
632 return [res["entity"]["value"] for res in bindings]
635def _build_virtual_property_triples(
636 display_name: str,
637 prop_config: dict,
638 target_shape: str | None,
639 entity_uris: list[str],
640 ctx: GroupingContext,
641) -> None:
642 if display_name not in ctx.grouped_triples:
643 ctx.grouped_triples[display_name] = {
644 "property": display_name,
645 "triples": [],
646 "subjectClass": ctx.highest_priority_class,
647 "subjectShape": ctx.highest_priority_shape,
648 "objectShape": None,
649 "is_virtual": True,
650 }
652 for entity_uri in entity_uris:
653 if ctx.historical_snapshot:
654 result, external_entity = execute_historical_query(
655 prop_config["fetchValueFromQuery"],
656 ctx.subject,
657 URIRef(entity_uri),
658 ctx.historical_snapshot,
659 )
660 else:
661 result, external_entity = execute_sparql_query(
662 prop_config["fetchValueFromQuery"], str(ctx.subject), entity_uri
663 )
665 if result:
666 ctx.fetched_values_map[str(result)] = entity_uri
667 new_triple_data = {
668 "triple": (str(ctx.subject), display_name, str(result)),
669 "external_entity": external_entity,
670 "object": entity_uri,
671 "subjectClass": ctx.highest_priority_class,
672 "subjectShape": ctx.highest_priority_shape,
673 "objectShape": target_shape,
674 "is_virtual": True,
675 }
676 ctx.grouped_triples[display_name]["triples"].append(new_triple_data)
679def process_virtual_property_display(
680 display_name: str,
681 prop_config: dict,
682 ctx: GroupingContext,
683) -> None:
684 implementation = prop_config.get("implementedVia", {})
685 field_overrides = implementation.get("fieldOverrides", {})
686 target = implementation.get("target", {})
687 target_class = target.get("class")
689 reference_field = None
690 for field_uri, override in field_overrides.items():
691 if override.get("value") == "${currentEntity}":
692 reference_field = field_uri
693 break
695 if not reference_field:
696 return
698 entity_uris = _fetch_virtual_property_entities(reference_field, target_class, ctx)
700 if prop_config.get("fetchValueFromQuery") and entity_uris:
701 _build_virtual_property_triples(
702 display_name, prop_config, target.get("shape"), entity_uris, ctx
703 )
705 elif display_name not in ctx.grouped_triples:
706 ctx.grouped_triples[display_name] = {
707 "property": display_name,
708 "triples": [],
709 "subjectClass": ctx.highest_priority_class,
710 "subjectShape": ctx.highest_priority_shape,
711 "objectShape": None,
712 "is_virtual": True,
713 }
716def execute_sparql_query(
717 query: str, subject: str, value: str
718) -> tuple[str | None, str | None]:
719 sparql = get_sparql()
721 decoded_subject = unquote(subject)
722 decoded_value = unquote(value)
723 query = query.replace("[[subject]]", f"<{decoded_subject}>")
724 query = query.replace("[[value]]", f"<{decoded_value}>")
725 sparql.setQuery(query)
726 sparql.setReturnFormat(JSON)
727 bindings = get_sparql_bindings(sparql.query().convert())
728 if bindings:
729 parsed_query = parseQuery(query)
730 algebra_query = translateQuery(parsed_query).algebra
731 variable_order = algebra_query["PV"]
732 result = bindings[0]
733 values = [
734 result.get(str(var_name), {}).get("value", None)
735 for var_name in variable_order
736 ]
737 first_value = values[0] if len(values) > 0 else None
738 second_value = values[1] if len(values) > 1 else None
739 return (first_value, second_value)
740 return None, None
743def process_ordering(
744 ctx: GroupingContext,
745 prop: dict,
746 order_property: str | None,
747 display_name: str,
748) -> None:
749 def get_ordered_sequence(
750 order_results: list[dict[str, dict[str, str]]] | list[ResultRow],
751 ) -> list[list[str]]:
752 order_map = {}
753 for res in order_results:
754 if isinstance(res, dict): # For live triplestore results
755 ordered_entity = res["orderedEntity"]["value"]
756 next_value = res["nextValue"]["value"]
757 else: # For historical snapshot results
758 ordered_entity = str(res[0])
759 next_value = str(res[1])
761 order_map[str(ordered_entity)] = (
762 None if str(next_value) == "NONE" else str(next_value)
763 )
765 all_sequences = []
766 start_elements = set(order_map.keys()) - set(order_map.values())
767 while start_elements:
768 sequence = []
769 current_element = start_elements.pop()
770 while current_element in order_map:
771 sequence.append(current_element)
772 current_element = order_map[current_element]
773 all_sequences.append(sequence)
774 return all_sequences
776 decoded_subject = unquote(ctx.subject)
778 sparql = get_sparql()
780 order_query = f"""
781 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue)
782 WHERE {{
783 <{decoded_subject}> <{prop["property"]}> ?orderedEntity.
784 OPTIONAL {{
785 ?orderedEntity <{order_property}> ?next.
786 }}
787 }}
788 """
789 if ctx.historical_snapshot:
790 order_results: list[dict[str, dict[str, str]]] | list[ResultRow] = list(
791 select_results(ctx.historical_snapshot.query(order_query))
792 )
793 else:
794 sparql.setQuery(order_query)
795 sparql.setReturnFormat(JSON)
796 order_results = get_sparql_bindings(sparql.query().convert())
798 order_sequences = get_ordered_sequence(order_results)
799 for sequence in order_sequences:
800 ctx.grouped_triples[display_name]["triples"].sort(
801 key=lambda x: (
802 sequence.index(
803 ctx.fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
804 )
805 if ctx.fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
806 in sequence
807 else float("inf")
808 )
809 )
812def process_default_property(
813 prop_uri: str,
814 triples: list[tuple[URIRef, URIRef, URIRef | Literal]],
815 grouped_triples: OrderedDict,
816 subject_shape: str | None = None,
817 subject_class: str | None = None,
818) -> None:
819 display_name = prop_uri
820 grouped_triples[display_name] = {
821 "property": prop_uri,
822 "triples": [],
823 "subjectClass": subject_class,
824 "subjectShape": subject_shape,
825 "objectShape": None,
826 }
827 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri]
828 for triple in triples_for_prop:
829 new_triple_data = {
830 "triple": (str(triple[0]), str(triple[1]), str(triple[2])),
831 "object": str(triple[2]),
832 "subjectClass": subject_class,
833 "subjectShape": subject_shape,
834 "objectShape": None,
835 }
836 grouped_triples[display_name]["triples"].append(new_triple_data)
839def execute_historical_query(
840 query: str, subject: str, value: str, historical_snapshot: Graph
841) -> tuple[str | None, str | None]:
842 decoded_subject = unquote(subject)
843 decoded_value = unquote(value)
844 query = query.replace("[[subject]]", f"<{decoded_subject}>")
845 query = query.replace("[[value]]", f"<{decoded_value}>")
846 results = historical_snapshot.query(query)
847 for row in select_results(results):
848 if len(row) == _SUBJECT_LABEL_PAIR_LENGTH:
849 return (str(row[0]), str(row[1]))
850 return None, None
853def get_property_order_from_rules(
854 highest_priority_class: str | None, shape_uri: str | None = None
855) -> list[str]:
856 """
857 Extract ordered list of properties from display rules
858 for given entity class and optionally a shape.
860 Args:
861 highest_priority_class: The highest priority class for the entity
862 shape_uri: Optional shape URI for the entity
864 Returns:
865 List of property URIs in the order specified by display rules
866 """
867 if not highest_priority_class:
868 return []
870 rule = find_matching_rule(highest_priority_class, shape_uri)
871 if not rule:
872 return []
874 ordered_properties = []
875 for prop in rule.get("displayProperties", []):
876 if not isinstance(prop, dict):
877 continue
878 if prop.get("isVirtual"):
879 continue # Virtual properties don't have RDF predicates
880 if "property" in prop:
881 ordered_properties.append(prop["property"])
883 return ordered_properties
886def get_predicate_ordering_info(
887 predicate_uri: str,
888 highest_priority_class: str | None,
889 entity_shape: str | None = None,
890) -> str | None:
891 """
892 Check if a predicate is ordered and return its ordering property.
894 Args:
895 predicate_uri: URI of the predicate to check
896 highest_priority_class: The highest priority class for the subject entity
897 entity_shape: Optional shape for the subject entity
899 Returns:
900 The ordering property URI if the predicate is ordered, None otherwise
901 """
902 display_rules = get_display_rules()
903 if not display_rules:
904 return None
906 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules)
907 if not rule:
908 return None
910 for prop in rule.get("displayProperties", []):
911 if not isinstance(prop, dict):
912 continue
913 if prop.get("isVirtual"):
914 continue # Virtual properties don't have RDF predicates or ordering
915 if prop.get("property") == predicate_uri:
916 return prop.get("orderedBy")
918 return None
921def get_shape_order_from_display_rules(
922 highest_priority_class: str | None, entity_shape: str | None, predicate_uri: str
923) -> list[str]:
924 """
925 Get the ordered list of shapes for a specific predicate from display rules.
927 Args:
928 highest_priority_class: The highest priority class for the entity
929 entity_shape: The shape for the subject entity
930 predicate_uri: The predicate URI to get shape ordering for
932 Returns:
933 List of shape URIs in the order specified in
934 displayRules, or empty list if no rules found
935 """
936 display_rules = get_display_rules()
937 if not display_rules:
938 return []
940 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules)
941 if not rule or "displayProperties" not in rule:
942 return []
944 for prop_config in rule["displayProperties"]:
945 if not isinstance(prop_config, dict):
946 continue
947 if prop_config.get("isVirtual"):
948 continue # Virtual properties don't have RDF predicates or display rules
949 if "property" not in prop_config:
950 continue # Defensive check for malformed configuration
951 if prop_config["property"] == predicate_uri and "displayRules" in prop_config:
952 return [
953 display_rule.get("shape")
954 for display_rule in prop_config["displayRules"]
955 if display_rule.get("shape")
956 ]
958 return []
961def get_similarity_properties(
962 entity_key: tuple[str, str | None],
963) -> list[str | dict[str, list[str]]] | None:
964 """Gets the similarity properties configuration for a given entity key.
966 This configuration specifies which properties should be used for similarity matching
967 using a list-based structure supporting OR logic between elements and
968 nested AND logic within elements.
970 Example structures:
971 - ['prop1', 'prop2'] # prop1 OR prop2
972 - [{'and': ['prop3', 'prop4']}] # prop3 AND prop4
973 - ['prop1', {'and': ['prop2', 'prop3']}] # prop1 OR (prop2 AND prop3)
975 Args:
976 entity_key: A tuple (class_uri, shape_uri)
978 Returns:
979 A list where each element is either a property URI string or a dictionary
980 {'and': [list_of_property_uris]}, representing the boolean logic.
981 Returns None if no configuration is found or if the structure is invalid.
982 """
983 class_uri = entity_key[0]
984 shape_uri = entity_key[1]
986 # Find the matching rule
987 rule = find_matching_rule(class_uri, shape_uri)
988 if not rule:
989 return None
991 similarity_props = rule.get("similarity_properties")
993 if not similarity_props or not isinstance(similarity_props, list):
994 return None
996 # Validate each element in the list.
997 validated_props = []
998 for item in similarity_props:
999 if isinstance(item, str):
1000 validated_props.append(item)
1001 elif isinstance(item, dict) and len(item) == 1 and "and" in item:
1002 and_list = item["and"]
1003 if (
1004 isinstance(and_list, list)
1005 and and_list
1006 and all(isinstance(p, str) for p in and_list)
1007 ):
1008 validated_props.append(item)
1009 else:
1010 logging.getLogger(__name__).warning(
1011 "Invalid 'and' group in similarity_properties"
1012 " for class %s. Expected"
1013 " {'and': ['prop_uri', ...]} with"
1014 " a non-empty list of strings.",
1015 class_uri,
1016 )
1017 return None
1018 else:
1019 logging.getLogger(__name__).warning(
1020 "Invalid item format in similarity_properties"
1021 " list for class %s. Expected a property URI"
1022 " string or {'and': [...]} dict.",
1023 class_uri,
1024 )
1025 return None
1027 return (
1028 validated_props or None
1029 ) # Return validated list or None if empty after validation