Coverage for heritrace/utils/display_rules_utils.py: 93%
360 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-10-13 17:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-10-13 17:12 +0000
1from collections import OrderedDict
2from typing import Dict, List, Optional, Tuple, Union
3from urllib.parse import unquote
5from heritrace.extensions import (get_custom_filter, get_display_rules,
6 get_form_fields, get_sparql)
7from rdflib import ConjunctiveGraph, Graph, Literal, URIRef
8from rdflib.plugins.sparql.algebra import translateQuery
9from rdflib.plugins.sparql.parser import parseQuery
10from SPARQLWrapper import JSON
13display_rules = get_display_rules()
16def find_matching_rule(class_uri=None, shape_uri=None, rules=None):
17 """
18 Find the most appropriate rule for a given class and/or shape.
19 At least one of class_uri or shape_uri must be provided.
21 Args:
22 class_uri: Optional URI of the class
23 shape_uri: Optional URI of the shape
24 rules: Optional list of rules to search in, defaults to global display_rules
26 Returns:
27 The matching rule or None if no match is found
28 """
29 if not rules:
30 rules = get_display_rules()
31 if not rules:
32 return None
34 # Initialize variables to track potential matches
35 class_match = None
36 shape_match = None
37 highest_priority = float('inf')
39 # Scan all rules to find the best match based on priority
40 for rule in rules:
41 rule_priority = rule.get("priority", 0)
43 # Case 1: Both class and shape match (exact match)
44 if class_uri and shape_uri and \
45 "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \
46 "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri):
47 # Exact match always takes highest precedence
48 return rule
50 # Case 2: Only class matches
51 elif class_uri and "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \
52 "shape" not in rule["target"]:
53 if class_match is None or rule_priority < highest_priority:
54 class_match = rule
55 highest_priority = rule_priority
57 # Case 3: Only shape matches
58 elif shape_uri and "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri) and \
59 "class" not in rule["target"]:
60 if shape_match is None or rule_priority < highest_priority:
61 shape_match = rule
62 highest_priority = rule_priority
64 # Return the best match based on priority
65 # Shape rules typically have higher specificity, so prefer them if they have equal priority
66 if shape_match and (class_match is None or
67 shape_match.get("priority", 0) <= class_match.get("priority", 0)):
68 return shape_match
69 elif class_match:
70 return class_match
72 return None
75def get_class_priority(entity_key):
76 """
77 Returns the priority of a specific entity key (class_uri, shape_uri).
78 Calculates the priority directly from the display rules.
79 Classes without defined rules receive the lowest priority (highest number).
81 Args:
82 entity_key: A tuple (class_uri, shape_uri)
83 """
84 class_uri = entity_key[0]
85 shape_uri = entity_key[1]
87 rule = find_matching_rule(class_uri, shape_uri)
88 return rule.get("priority", 0) if rule else float('inf')
91def is_entity_type_visible(entity_key):
92 """
93 Determines if an entity type should be displayed.
95 Args:
96 entity_key: A tuple (class_uri, shape_uri)
97 """
98 class_uri = entity_key[0]
99 shape_uri = entity_key[1]
101 rule = find_matching_rule(class_uri, shape_uri)
102 return rule.get("shouldBeDisplayed", True) if rule else True
105def get_sortable_properties(entity_key: Tuple[str, str]) -> List[Dict[str, str]]:
106 """
107 Gets the sortable properties from display rules for an entity type and/or shape.
108 Infers the sorting type from form_fields_cache.
110 Args:
111 entity_key: A tuple (class_uri, shape_uri)
113 Returns:
114 List of dictionaries with sorting information
115 """
116 display_rules = get_display_rules()
117 if not display_rules:
118 return []
120 form_fields = get_form_fields()
122 class_uri = entity_key[0]
123 shape_uri = entity_key[1]
125 rule = find_matching_rule(class_uri, shape_uri, display_rules)
126 if not rule or "sortableBy" not in rule:
127 return []
129 sort_props = []
130 for sort_config in rule["sortableBy"]:
131 prop = sort_config.copy()
133 for display_prop in rule["displayProperties"]:
134 if display_prop["property"] == prop["property"]:
135 if "displayRules" in display_prop:
136 prop["displayName"] = display_prop["displayRules"][0][
137 "displayName"
138 ]
139 else:
140 prop["displayName"] = display_prop.get(
141 "displayName", prop["property"]
142 )
143 break
145 # Default to string sorting
146 prop["sortType"] = "string"
148 # Try to determine the sort type from form fields
149 if form_fields:
150 # First try with the exact entity_key (class, shape)
151 if entity_key in form_fields and prop["property"] in form_fields[entity_key]:
152 field_info = form_fields[entity_key][prop["property"]][0] # Take the first field definition
153 prop["sortType"] = determine_sort_type(field_info)
155 sort_props.append(prop)
157 return sort_props
160def determine_sort_type(field_info):
161 """Helper function to determine sort type from field info."""
162 # If there's a shape, it's a reference to an entity (sort by label)
163 if field_info.get("nodeShape"):
164 return "string"
165 # Otherwise look at the datatypes
166 elif field_info.get("datatypes"):
167 datatype = str(field_info["datatypes"][0]).lower()
168 if any(t in datatype for t in ["date", "time"]):
169 return "date"
170 elif any(
171 t in datatype
172 for t in ["int", "float", "decimal", "double", "number"]
173 ):
174 return "number"
175 elif "boolean" in datatype:
176 return "boolean"
177 # Default to string
178 return "string"
181def get_highest_priority_class(subject_classes):
182 """
183 Find the highest priority class from the given list of classes.
185 Args:
186 subject_classes: List of class URIs
188 Returns:
189 The highest priority class or None if no classes are provided
190 """
191 from heritrace.utils.shacl_utils import determine_shape_for_classes
193 if not subject_classes:
194 return None
196 highest_priority = float('inf')
197 highest_priority_class = None
199 for class_uri in subject_classes:
200 class_uri = str(class_uri)
201 shape = determine_shape_for_classes([class_uri])
202 entity_key = (class_uri, shape)
203 priority = get_class_priority(entity_key)
204 if priority < highest_priority:
205 highest_priority = priority
206 highest_priority_class = class_uri
208 if highest_priority_class is None and subject_classes:
209 highest_priority_class = str(subject_classes[0])
211 return highest_priority_class
214def get_grouped_triples(
215 subject: URIRef,
216 triples: List[Tuple[URIRef, URIRef, URIRef|Literal]],
217 valid_predicates_info: List[str],
218 historical_snapshot: Optional[Graph] = None,
219 highest_priority_class: Optional[str] = None,
220 highest_priority_shape: Optional[str] = None
221) -> Tuple[OrderedDict, set, dict]:
222 """
223 This function groups the triples based on the display rules.
224 It also fetches the values for the properties that are configured to be fetched from the query.
226 Args:
227 subject: The subject URI
228 triples: List of triples for the subject
229 valid_predicates_info: List of valid predicates for the subject
230 historical_snapshot: Optional historical snapshot graph
231 highest_priority_class: The highest priority class URI for the subject
232 highest_priority_shape: The highest priority shape URI for the subject
234 Returns:
235 Tuple of grouped triples, relevant properties, and fetched values map
237 Note:
238 relevant_properties contains all properties that should be considered
239 "relevant" for UI operations (adding/deleting). This includes:
240 - Properties configured in display rules when rules exist and match
241 - ALL valid properties when no display rules exist or no rules match
242 This ensures users can always interact with entities even without display rules.
243 """
244 display_rules = get_display_rules()
245 form_fields = get_form_fields()
247 grouped_triples = OrderedDict()
248 relevant_properties = set()
249 fetched_values_map = dict()
251 matching_rule = find_matching_rule(highest_priority_class, highest_priority_shape, display_rules)
252 matching_form_field = form_fields.get((highest_priority_class, highest_priority_shape))
254 ordered_properties = []
255 if display_rules and matching_rule:
256 for prop_config in matching_rule.get("displayProperties", []):
257 if prop_config.get("isVirtual"):
258 prop_uri = prop_config.get("displayName")
259 else:
260 prop_uri = prop_config.get("property")
261 if prop_uri and prop_uri not in ordered_properties:
262 ordered_properties.append(prop_uri)
264 for prop_uri in valid_predicates_info:
265 if prop_uri not in ordered_properties:
266 ordered_properties.append(prop_uri)
268 for prop_uri in ordered_properties:
269 current_prop_config = None
271 if display_rules and matching_rule:
272 for prop_config in matching_rule.get("displayProperties", []):
273 config_identifier = prop_config.get("displayName") if prop_config.get("isVirtual") else prop_config.get("property")
274 if config_identifier == prop_uri:
275 current_prop_config = prop_config
276 break
278 current_form_field = matching_form_field.get(prop_uri) if matching_form_field else None
280 if current_prop_config:
281 if "displayRules" in current_prop_config:
282 is_ordered = "orderedBy" in current_prop_config
283 order_property = current_prop_config.get("orderedBy")
285 for display_rule_nested in current_prop_config["displayRules"]:
286 display_name_nested = display_rule_nested.get(
287 "displayName", prop_uri
288 )
289 relevant_properties.add(prop_uri)
290 object_shape = display_rule_nested.get("shape")
291 if current_prop_config.get("isVirtual"):
292 process_virtual_property_display(
293 display_name_nested,
294 current_prop_config,
295 subject,
296 grouped_triples,
297 fetched_values_map,
298 historical_snapshot,
299 highest_priority_shape,
300 highest_priority_class
301 )
302 else:
303 process_display_rule(
304 display_name_nested,
305 prop_uri,
306 display_rule_nested,
307 subject,
308 triples,
309 grouped_triples,
310 fetched_values_map,
311 historical_snapshot,
312 highest_priority_shape,
313 object_shape,
314 highest_priority_class
315 )
316 if is_ordered and not current_prop_config.get("isVirtual", False):
317 grouped_triples[display_name_nested]["is_draggable"] = True
318 grouped_triples[display_name_nested]["ordered_by"] = order_property
319 process_ordering(
320 subject,
321 current_prop_config,
322 order_property,
323 grouped_triples,
324 display_name_nested,
325 fetched_values_map,
326 historical_snapshot,
327 )
329 # Ensure the grouped_triples entry exists
330 if display_name_nested not in grouped_triples:
331 grouped_triples[display_name_nested] = {
332 "property": prop_uri,
333 "triples": [],
334 "subjectClass": highest_priority_class,
335 "subjectShape": highest_priority_shape,
336 "objectShape": display_rule_nested.get("shape")
337 }
339 if "intermediateRelation" in display_rule_nested or "intermediateRelation" in current_prop_config:
340 # Set intermediateRelation from the appropriate source
341 if "intermediateRelation" in display_rule_nested:
342 grouped_triples[display_name_nested]["intermediateRelation"] = display_rule_nested["intermediateRelation"]
343 else: # Must be in current_prop_config based on the if condition
344 grouped_triples[display_name_nested]["intermediateRelation"] = current_prop_config["intermediateRelation"]
346 else:
347 display_name_simple = current_prop_config.get("displayName", prop_uri)
348 # Only add non-virtual properties to relevant_properties
349 # Virtual properties are handled separately in entity.py
350 if not current_prop_config.get("isVirtual"):
351 relevant_properties.add(prop_uri)
353 object_shape = None
354 if current_form_field:
355 for form_field in current_form_field:
356 object_shape = form_field.get("nodeShape")
357 break
359 if current_prop_config.get("isVirtual"):
360 process_virtual_property_display(
361 display_name_simple,
362 current_prop_config,
363 subject,
364 grouped_triples,
365 fetched_values_map,
366 historical_snapshot,
367 highest_priority_shape,
368 highest_priority_class
369 )
370 else:
371 process_display_rule(
372 display_name_simple,
373 prop_uri,
374 current_prop_config,
375 subject,
376 triples,
377 grouped_triples,
378 fetched_values_map,
379 historical_snapshot,
380 highest_priority_shape,
381 object_shape,
382 highest_priority_class
383 )
384 if "orderedBy" in current_prop_config and not current_prop_config.get("isVirtual", False):
385 if display_name_simple not in grouped_triples:
386 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")}
387 grouped_triples[display_name_simple]["is_draggable"] = True
388 grouped_triples[display_name_simple]["ordered_by"] = current_prop_config.get("orderedBy")
389 process_ordering(
390 subject,
391 current_prop_config,
392 current_prop_config.get("orderedBy"),
393 grouped_triples,
394 display_name_simple,
395 fetched_values_map,
396 historical_snapshot,
397 highest_priority_shape
398 )
399 if "intermediateRelation" in current_prop_config:
400 if display_name_simple not in grouped_triples:
401 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")}
402 grouped_triples[display_name_simple]["intermediateRelation"] = current_prop_config["intermediateRelation"]
403 else:
404 # Property without specific configuration - add to relevant_properties
405 # Don't process properties without configuration (they are not virtual in this case)
406 relevant_properties.add(prop_uri)
407 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class)
408 else:
409 # No display rules or no matching rule - add all properties to relevant_properties
410 relevant_properties.add(prop_uri)
411 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class)
413 grouped_triples = OrderedDict(grouped_triples)
414 return grouped_triples, relevant_properties
417def process_display_rule(
418 display_name,
419 prop_uri,
420 rule,
421 subject,
422 triples,
423 grouped_triples,
424 fetched_values_map,
425 historical_snapshot=None,
426 subject_shape=None,
427 object_shape=None,
428 subject_class=None,
429):
430 if display_name not in grouped_triples:
431 grouped_triples[display_name] = {
432 "property": prop_uri,
433 "triples": [],
434 "subjectClass": subject_class,
435 "subjectShape": subject_shape,
436 "objectShape": object_shape,
437 "intermediateRelation": rule.get("intermediateRelation"),
438 }
439 for triple in triples:
440 if str(triple[1]) == prop_uri:
441 if rule.get("fetchValueFromQuery"):
442 if historical_snapshot:
443 result, external_entity = execute_historical_query(
444 rule["fetchValueFromQuery"],
445 subject,
446 triple[2],
447 historical_snapshot,
448 )
449 else:
450 result, external_entity = execute_sparql_query(
451 rule["fetchValueFromQuery"], subject, triple[2]
452 )
453 if result:
454 fetched_values_map[str(result)] = str(triple[2])
455 new_triple = (str(triple[0]), str(triple[1]), str(result))
456 object_uri = str(triple[2])
457 new_triple_data = {
458 "triple": new_triple,
459 "external_entity": external_entity,
460 "object": object_uri,
461 "subjectClass": subject_class,
462 "subjectShape": subject_shape,
463 "objectShape": object_shape,
464 }
465 grouped_triples[display_name]["triples"].append(new_triple_data)
466 else:
467 if str(triple[1]) == 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type':
468 from heritrace.utils.shacl_utils import determine_shape_for_classes
469 object_class_shape = determine_shape_for_classes([triple[2]])
470 result = get_custom_filter().human_readable_class((triple[2], object_class_shape))
471 else:
472 result = triple[2]
474 object_uri = str(triple[2])
476 new_triple_data = {
477 "triple": (str(triple[0]), str(triple[1]), result),
478 "object": object_uri,
479 "subjectClass": subject_class,
480 "subjectShape": subject_shape,
481 "objectShape": object_shape,
482 }
483 grouped_triples[display_name]["triples"].append(new_triple_data)
486def process_virtual_property_display(
487 display_name: str,
488 prop_config: dict,
489 subject: URIRef,
490 grouped_triples: OrderedDict,
491 fetched_values_map: dict,
492 historical_snapshot: Optional[Graph] = None,
493 subject_shape: Optional[str] = None,
494 subject_class: Optional[str] = None
495):
496 """Process virtual properties by querying for entities that reference the current entity."""
498 implementation = prop_config.get("implementedVia", {})
499 field_overrides = implementation.get("fieldOverrides", {})
500 target = implementation.get("target", {})
501 target_class = target.get("class")
503 # Find which field should reference the current entity
504 reference_field = None
505 for field_uri, override in field_overrides.items():
506 if override.get("value") == "${currentEntity}":
507 reference_field = field_uri
508 break
510 if not reference_field:
511 return
513 decoded_subject = unquote(str(subject))
515 # Query for entities that reference the current entity via the reference field
516 query = f"""
517 SELECT DISTINCT ?entity
518 WHERE {{
519 ?entity <{reference_field}> <{decoded_subject}> .
520 """
522 if target_class:
523 query += f"""
524 ?entity a <{target_class}> .
525 """
527 query += """
528 }
529 """
531 if historical_snapshot:
532 # Execute query on historical snapshot
533 results = list(historical_snapshot.query(query))
534 entity_uris = [str(row[0]) for row in results]
535 else:
536 # Execute query on live triplestore
537 sparql = get_sparql()
538 sparql.setQuery(query)
539 sparql.setReturnFormat(JSON)
540 results = sparql.query().convert().get("results", {}).get("bindings", [])
541 entity_uris = [res["entity"]["value"] for res in results]
543 # Now fetch display values for these entities if fetchValueFromQuery is configured
545 if prop_config.get("fetchValueFromQuery") and entity_uris:
547 if display_name not in grouped_triples:
548 grouped_triples[display_name] = {
549 "property": display_name, # Use display name as identifier for virtual properties
550 "triples": [],
551 "subjectClass": subject_class,
552 "subjectShape": subject_shape,
553 "objectShape": None, # Should be None for virtual properties to match key format
554 "is_virtual": True
555 }
557 for entity_uri in entity_uris:
558 # Execute the fetch query for each entity
559 if historical_snapshot:
560 result, external_entity = execute_historical_query(
561 prop_config["fetchValueFromQuery"],
562 subject,
563 URIRef(entity_uri),
564 historical_snapshot
565 )
566 else:
567 result, external_entity = execute_sparql_query(
568 prop_config["fetchValueFromQuery"],
569 str(subject),
570 entity_uri
571 )
573 if result:
574 fetched_values_map[str(result)] = entity_uri
575 new_triple_data = {
576 "triple": (str(subject), display_name, str(result)),
577 "external_entity": external_entity,
578 "object": entity_uri,
579 "subjectClass": subject_class,
580 "subjectShape": subject_shape,
581 "objectShape": target.get("shape"),
582 "is_virtual": True
583 }
584 grouped_triples[display_name]["triples"].append(new_triple_data)
585 else:
586 # Even if no entities are found, we should still create the entry for virtual properties
587 # so they can be added via the interface
589 if display_name not in grouped_triples:
590 grouped_triples[display_name] = {
591 "property": display_name, # Use display name as identifier for virtual properties
592 "triples": [],
593 "subjectClass": subject_class,
594 "subjectShape": subject_shape,
595 "objectShape": None, # Should be None for virtual properties to match key format
596 "is_virtual": True
597 }
600def execute_sparql_query(query: str, subject: str, value: str) -> Tuple[str, str]:
601 sparql = get_sparql()
603 decoded_subject = unquote(subject)
604 decoded_value = unquote(value)
605 query = query.replace("[[subject]]", f"<{decoded_subject}>")
606 query = query.replace("[[value]]", f"<{decoded_value}>")
607 sparql.setQuery(query)
608 sparql.setReturnFormat(JSON)
609 results = sparql.query().convert().get("results", {}).get("bindings", [])
610 if results:
611 parsed_query = parseQuery(query)
612 algebra_query = translateQuery(parsed_query).algebra
613 variable_order = algebra_query["PV"]
614 result = results[0]
615 values = [
616 result.get(str(var_name), {}).get("value", None)
617 for var_name in variable_order
618 ]
619 first_value = values[0] if len(values) > 0 else None
620 second_value = values[1] if len(values) > 1 else None
621 return (first_value, second_value)
622 return None, None
625def process_ordering(
626 subject,
627 prop,
628 order_property,
629 grouped_triples,
630 display_name,
631 fetched_values_map,
632 historical_snapshot: ConjunctiveGraph | Graph | None = None,
633):
634 def get_ordered_sequence(order_results):
635 order_map = {}
636 for res in order_results:
637 if isinstance(res, dict): # For live triplestore results
638 ordered_entity = res["orderedEntity"]["value"]
639 next_value = res["nextValue"]["value"]
640 else: # For historical snapshot results
641 ordered_entity = str(res[0])
642 next_value = str(res[1])
644 order_map[str(ordered_entity)] = (
645 None if str(next_value) == "NONE" else str(next_value)
646 )
648 all_sequences = []
649 start_elements = set(order_map.keys()) - set(order_map.values())
650 while start_elements:
651 sequence = []
652 current_element = start_elements.pop()
653 while current_element in order_map:
654 sequence.append(current_element)
655 current_element = order_map[current_element]
656 all_sequences.append(sequence)
657 return all_sequences
659 decoded_subject = unquote(subject)
661 sparql = get_sparql()
663 order_query = f"""
664 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue)
665 WHERE {{
666 <{decoded_subject}> <{prop['property']}> ?orderedEntity.
667 OPTIONAL {{
668 ?orderedEntity <{order_property}> ?next.
669 }}
670 }}
671 """
672 if historical_snapshot:
673 order_results = list(historical_snapshot.query(order_query))
674 else:
675 sparql.setQuery(order_query)
676 sparql.setReturnFormat(JSON)
677 order_results = sparql.query().convert().get("results", {}).get("bindings", [])
679 order_sequences = get_ordered_sequence(order_results)
680 for sequence in order_sequences:
681 grouped_triples[display_name]["triples"].sort(
682 key=lambda x: (
683 sequence.index(
684 fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
685 )
686 if fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
687 in sequence
688 else float("inf")
689 )
690 )
693def process_default_property(prop_uri, triples, grouped_triples, subject_shape=None, subject_class=None):
694 display_name = prop_uri
695 grouped_triples[display_name] = {
696 "property": prop_uri,
697 "triples": [],
698 "subjectClass": subject_class,
699 "subjectShape": subject_shape,
700 "objectShape": None
701 }
702 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri]
703 for triple in triples_for_prop:
704 new_triple_data = {
705 "triple": (str(triple[0]), str(triple[1]), str(triple[2])),
706 "object": str(triple[2]),
707 "subjectClass": subject_class,
708 "subjectShape": subject_shape,
709 "objectShape": None,
710 }
711 grouped_triples[display_name]["triples"].append(new_triple_data)
714def execute_historical_query(
715 query: str, subject: str, value: str, historical_snapshot: Graph
716) -> Tuple[str, str]:
717 decoded_subject = unquote(subject)
718 decoded_value = unquote(value)
719 query = query.replace("[[subject]]", f"<{decoded_subject}>")
720 query = query.replace("[[value]]", f"<{decoded_value}>")
721 results = historical_snapshot.query(query)
722 if results:
723 for result in results:
724 if len(result) == 2:
725 return (str(result[0]), str(result[1]))
726 return None, None
729def get_property_order_from_rules(highest_priority_class: str, shape_uri: str = None):
730 """
731 Extract ordered list of properties from display rules for given entity class and optionally a shape.
733 Args:
734 highest_priority_class: The highest priority class for the entity
735 shape_uri: Optional shape URI for the entity
737 Returns:
738 List of property URIs in the order specified by display rules
739 """
740 if not highest_priority_class:
741 return []
743 rule = find_matching_rule(highest_priority_class, shape_uri)
744 if not rule:
745 return []
747 ordered_properties = []
748 for prop in rule.get("displayProperties", []):
749 if isinstance(prop, dict) and "property" in prop:
750 ordered_properties.append(prop["property"])
752 return ordered_properties
755def get_predicate_ordering_info(predicate_uri: str, highest_priority_class: str, entity_shape: str = None) -> Optional[str]:
756 """
757 Check if a predicate is ordered and return its ordering property.
759 Args:
760 predicate_uri: URI of the predicate to check
761 highest_priority_class: The highest priority class for the subject entity
762 entity_shape: Optional shape for the subject entity
764 Returns:
765 The ordering property URI if the predicate is ordered, None otherwise
766 """
767 display_rules = get_display_rules()
768 if not display_rules:
769 return None
771 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules)
772 if not rule:
773 return None
775 for prop in rule.get("displayProperties", []):
776 if isinstance(prop, dict) and prop.get("property") == predicate_uri:
777 return prop.get("orderedBy")
779 return None
782def get_shape_order_from_display_rules(highest_priority_class: str, entity_shape: str, predicate_uri: str) -> list:
783 """
784 Get the ordered list of shapes for a specific predicate from display rules.
786 Args:
787 highest_priority_class: The highest priority class for the entity
788 entity_shape: The shape for the subject entity
789 predicate_uri: The predicate URI to get shape ordering for
791 Returns:
792 List of shape URIs in the order specified in displayRules, or empty list if no rules found
793 """
794 display_rules = get_display_rules()
795 if not display_rules:
796 return []
798 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules)
799 if not rule or "displayProperties" not in rule:
800 return []
802 for prop_config in rule["displayProperties"]:
803 if prop_config["property"] == predicate_uri:
804 if "displayRules" in prop_config:
805 return [display_rule.get("shape") for display_rule in prop_config["displayRules"]
806 if display_rule.get("shape")]
808 return []
811def get_similarity_properties(entity_key: Tuple[str, str]) -> Optional[List[Union[str, Dict[str, List[str]]]]]:
812 """Gets the similarity properties configuration for a given entity key.
814 This configuration specifies which properties should be used for similarity matching
815 using a list-based structure supporting OR logic between elements and
816 nested AND logic within elements.
818 Example structures:
819 - ['prop1', 'prop2'] # prop1 OR prop2
820 - [{'and': ['prop3', 'prop4']}] # prop3 AND prop4
821 - ['prop1', {'and': ['prop2', 'prop3']}] # prop1 OR (prop2 AND prop3)
823 Args:
824 entity_key: A tuple (class_uri, shape_uri)
826 Returns:
827 A list where each element is either a property URI string or a dictionary
828 {'and': [list_of_property_uris]}, representing the boolean logic.
829 Returns None if no configuration is found or if the structure is invalid.
830 """
831 class_uri = entity_key[0]
832 shape_uri = entity_key[1]
834 # Find the matching rule
835 rule = find_matching_rule(class_uri, shape_uri)
836 if not rule:
837 return None
839 similarity_props = rule.get("similarity_properties")
841 if not similarity_props or not isinstance(similarity_props, list):
842 return None
844 # Validate each element in the list.
845 validated_props = []
846 for item in similarity_props:
847 if isinstance(item, str):
848 validated_props.append(item)
849 elif isinstance(item, dict) and len(item) == 1 and "and" in item:
850 and_list = item["and"]
851 if isinstance(and_list, list) and and_list and all(isinstance(p, str) for p in and_list):
852 validated_props.append(item)
853 else:
854 print(
855 f"Warning: Invalid 'and' group in similarity_properties" +
856 (f" for class {class_uri}" if class_uri else "") +
857 (f" with shape {shape_uri}" if shape_uri else "") +
858 f". Expected {{'and': ['prop_uri', ...]}} with a non-empty list of strings."
859 )
860 return None # Invalid 'and' group structure
861 else:
862 print(
863 f"Warning: Invalid item format in similarity_properties list" +
864 (f" for class {class_uri}" if class_uri else "") +
865 (f" with shape {shape_uri}" if shape_uri else "") +
866 f". Expected a property URI string or {{'and': [...]}} dict."
867 )
868 return None # Invalid item type
870 return validated_props if validated_props else None # Return validated list or None if empty after validation