Coverage for heritrace / utils / display_rules_utils.py: 90%
374 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from collections import OrderedDict
6from typing import Dict, List, Optional, Tuple, Union
7from urllib.parse import unquote
9from heritrace.extensions import (get_custom_filter, get_display_rules,
10 get_form_fields, get_sparql)
11from rdflib import Dataset, Graph, Literal, URIRef
12from rdflib.plugins.sparql.algebra import translateQuery
13from rdflib.plugins.sparql.parser import parseQuery
14from SPARQLWrapper import JSON
17display_rules = get_display_rules()
20def find_matching_rule(class_uri=None, shape_uri=None, rules=None):
21 """
22 Find the most appropriate rule for a given class and/or shape.
23 At least one of class_uri or shape_uri must be provided.
25 Args:
26 class_uri: Optional URI of the class
27 shape_uri: Optional URI of the shape
28 rules: Optional list of rules to search in, defaults to global display_rules
30 Returns:
31 The matching rule or None if no match is found
32 """
33 if not rules:
34 rules = get_display_rules()
35 if not rules:
36 return None
38 # Initialize variables to track potential matches
39 class_match = None
40 shape_match = None
41 highest_priority = float('inf')
43 # Scan all rules to find the best match based on priority
44 for rule in rules:
45 rule_priority = rule.get("priority", 0)
47 # Case 1: Both class and shape match (exact match)
48 if class_uri and shape_uri and \
49 "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \
50 "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri):
51 # Exact match always takes highest precedence
52 return rule
54 # Case 2: Only class matches
55 elif class_uri and "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \
56 "shape" not in rule["target"]:
57 if class_match is None or rule_priority < highest_priority:
58 class_match = rule
59 highest_priority = rule_priority
61 # Case 3: Only shape matches
62 elif shape_uri and "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri) and \
63 "class" not in rule["target"]:
64 if shape_match is None or rule_priority < highest_priority:
65 shape_match = rule
66 highest_priority = rule_priority
68 # Return the best match based on priority
69 # Shape rules typically have higher specificity, so prefer them if they have equal priority
70 if shape_match and (class_match is None or
71 shape_match.get("priority", 0) <= class_match.get("priority", 0)):
72 return shape_match
73 elif class_match:
74 return class_match
76 return None
79def get_class_priority(entity_key):
80 """
81 Returns the priority of a specific entity key (class_uri, shape_uri).
82 Calculates the priority directly from the display rules.
83 Classes without defined rules receive the lowest priority (highest number).
85 Args:
86 entity_key: A tuple (class_uri, shape_uri)
87 """
88 class_uri = entity_key[0]
89 shape_uri = entity_key[1]
91 rule = find_matching_rule(class_uri, shape_uri)
92 return rule.get("priority", 0) if rule else float('inf')
95def is_entity_type_visible(entity_key):
96 """
97 Determines if an entity type should be displayed.
99 Args:
100 entity_key: A tuple (class_uri, shape_uri)
101 """
102 class_uri = entity_key[0]
103 shape_uri = entity_key[1]
105 rule = find_matching_rule(class_uri, shape_uri)
106 return rule.get("shouldBeDisplayed", True) if rule else True
109def get_sortable_properties(entity_key: Tuple[str, str]) -> List[Dict[str, str]]:
110 """
111 Gets the sortable properties from display rules for an entity type and/or shape.
112 Infers the sorting type from form_fields_cache.
114 Args:
115 entity_key: A tuple (class_uri, shape_uri)
117 Returns:
118 List of dictionaries with sorting information
119 """
120 display_rules = get_display_rules()
121 if not display_rules:
122 return []
124 form_fields = get_form_fields()
126 class_uri = entity_key[0]
127 shape_uri = entity_key[1]
129 rule = find_matching_rule(class_uri, shape_uri, display_rules)
130 if not rule or "sortableBy" not in rule:
131 return []
133 sort_props = []
134 for sort_config in rule["sortableBy"]:
135 prop = sort_config.copy()
137 for display_prop in rule["displayProperties"]:
138 if display_prop["property"] == prop["property"]:
139 if "displayRules" in display_prop:
140 prop["displayName"] = display_prop["displayRules"][0][
141 "displayName"
142 ]
143 else:
144 prop["displayName"] = display_prop.get(
145 "displayName", prop["property"]
146 )
147 break
149 # Default to string sorting
150 prop["sortType"] = "string"
152 # Try to determine the sort type from form fields
153 if form_fields:
154 # First try with the exact entity_key (class, shape)
155 if entity_key in form_fields and prop["property"] in form_fields[entity_key]:
156 field_info = form_fields[entity_key][prop["property"]][0] # Take the first field definition
157 prop["sortType"] = determine_sort_type(field_info)
159 sort_props.append(prop)
161 return sort_props
164def determine_sort_type(field_info):
165 """Helper function to determine sort type from field info."""
166 # If there's a shape, it's a reference to an entity (sort by label)
167 if field_info.get("nodeShape"):
168 return "string"
169 # Otherwise look at the datatypes
170 elif field_info.get("datatypes"):
171 datatype = str(field_info["datatypes"][0]).lower()
172 if any(t in datatype for t in ["date", "time"]):
173 return "date"
174 elif any(
175 t in datatype
176 for t in ["int", "float", "decimal", "double", "number"]
177 ):
178 return "number"
179 elif "boolean" in datatype:
180 return "boolean"
181 # Default to string
182 return "string"
185def get_highest_priority_class(subject_classes):
186 """
187 Find the highest priority class from the given list of classes.
189 Args:
190 subject_classes: List of class URIs
192 Returns:
193 The highest priority class or None if no classes are provided
194 """
195 from heritrace.utils.shacl_utils import determine_shape_for_classes
197 if not subject_classes:
198 return None
200 highest_priority = float('inf')
201 highest_priority_class = None
203 for class_uri in subject_classes:
204 class_uri = str(class_uri)
205 shape = determine_shape_for_classes([class_uri])
206 entity_key = (class_uri, shape)
207 priority = get_class_priority(entity_key)
208 if priority < highest_priority:
209 highest_priority = priority
210 highest_priority_class = class_uri
212 if highest_priority_class is None and subject_classes:
213 highest_priority_class = str(subject_classes[0])
215 return highest_priority_class
218def get_grouped_triples(
219 subject: URIRef,
220 triples: List[Tuple[URIRef, URIRef, URIRef|Literal]],
221 valid_predicates_info: List[str],
222 historical_snapshot: Optional[Graph] = None,
223 highest_priority_class: Optional[str] = None,
224 highest_priority_shape: Optional[str] = None
225) -> Tuple[OrderedDict, set, dict]:
226 """
227 This function groups the triples based on the display rules.
228 It also fetches the values for the properties that are configured to be fetched from the query.
230 Args:
231 subject: The subject URI
232 triples: List of triples for the subject
233 valid_predicates_info: List of valid predicates for the subject
234 historical_snapshot: Optional historical snapshot graph
235 highest_priority_class: The highest priority class URI for the subject
236 highest_priority_shape: The highest priority shape URI for the subject
238 Returns:
239 Tuple of grouped triples, relevant properties, and fetched values map
241 Note:
242 relevant_properties contains all properties that should be considered
243 "relevant" for UI operations (adding/deleting). This includes:
244 - Properties configured in display rules when rules exist and match
245 - ALL valid properties when no display rules exist or no rules match
246 This ensures users can always interact with entities even without display rules.
247 """
248 display_rules = get_display_rules()
249 form_fields = get_form_fields()
251 grouped_triples = OrderedDict()
252 relevant_properties = set()
253 fetched_values_map = dict()
255 matching_rule = find_matching_rule(highest_priority_class, highest_priority_shape, display_rules)
256 matching_form_field = form_fields.get((highest_priority_class, highest_priority_shape))
258 ordered_properties = []
259 if display_rules and matching_rule:
260 for prop_config in matching_rule.get("displayProperties", []):
261 if prop_config.get("isVirtual"):
262 prop_uri = prop_config.get("displayName")
263 else:
264 prop_uri = prop_config.get("property")
265 if prop_uri and prop_uri not in ordered_properties:
266 ordered_properties.append(prop_uri)
268 for prop_uri in valid_predicates_info:
269 if prop_uri not in ordered_properties:
270 ordered_properties.append(prop_uri)
272 for prop_uri in ordered_properties:
273 current_prop_config = None
275 if display_rules and matching_rule:
276 for prop_config in matching_rule.get("displayProperties", []):
277 config_identifier = prop_config.get("displayName") if prop_config.get("isVirtual") else prop_config.get("property")
278 if config_identifier == prop_uri:
279 current_prop_config = prop_config
280 break
282 current_form_field = matching_form_field.get(prop_uri) if matching_form_field else None
284 if current_prop_config:
285 if "displayRules" in current_prop_config:
286 is_ordered = "orderedBy" in current_prop_config
287 order_property = current_prop_config.get("orderedBy")
289 for display_rule_nested in current_prop_config["displayRules"]:
290 display_name_nested = display_rule_nested.get(
291 "displayName", prop_uri
292 )
293 relevant_properties.add(prop_uri)
294 object_shape = display_rule_nested.get("shape")
295 if current_prop_config.get("isVirtual"):
296 process_virtual_property_display(
297 display_name_nested,
298 current_prop_config,
299 subject,
300 grouped_triples,
301 fetched_values_map,
302 historical_snapshot,
303 highest_priority_shape,
304 highest_priority_class
305 )
306 else:
307 process_display_rule(
308 display_name_nested,
309 prop_uri,
310 display_rule_nested,
311 subject,
312 triples,
313 grouped_triples,
314 fetched_values_map,
315 historical_snapshot,
316 highest_priority_shape,
317 object_shape,
318 highest_priority_class
319 )
320 if is_ordered and not current_prop_config.get("isVirtual", False):
321 grouped_triples[display_name_nested]["is_draggable"] = True
322 grouped_triples[display_name_nested]["ordered_by"] = order_property
323 process_ordering(
324 subject,
325 current_prop_config,
326 order_property,
327 grouped_triples,
328 display_name_nested,
329 fetched_values_map,
330 historical_snapshot,
331 )
333 # Ensure the grouped_triples entry exists
334 if display_name_nested not in grouped_triples:
335 grouped_triples[display_name_nested] = {
336 "property": prop_uri,
337 "triples": [],
338 "subjectClass": highest_priority_class,
339 "subjectShape": highest_priority_shape,
340 "objectShape": display_rule_nested.get("shape")
341 }
343 if "intermediateRelation" in display_rule_nested or "intermediateRelation" in current_prop_config:
344 # Set intermediateRelation from the appropriate source
345 if "intermediateRelation" in display_rule_nested:
346 grouped_triples[display_name_nested]["intermediateRelation"] = display_rule_nested["intermediateRelation"]
347 else: # Must be in current_prop_config based on the if condition
348 grouped_triples[display_name_nested]["intermediateRelation"] = current_prop_config["intermediateRelation"]
350 else:
351 display_name_simple = current_prop_config.get("displayName", prop_uri)
352 # Only add non-virtual properties to relevant_properties
353 # Virtual properties are handled separately in entity.py
354 if not current_prop_config.get("isVirtual"):
355 relevant_properties.add(prop_uri)
357 object_shape = None
358 if current_form_field:
359 for form_field in current_form_field:
360 object_shape = form_field.get("nodeShape")
361 break
363 if current_prop_config.get("isVirtual"):
364 process_virtual_property_display(
365 display_name_simple,
366 current_prop_config,
367 subject,
368 grouped_triples,
369 fetched_values_map,
370 historical_snapshot,
371 highest_priority_shape,
372 highest_priority_class
373 )
374 else:
375 process_display_rule(
376 display_name_simple,
377 prop_uri,
378 current_prop_config,
379 subject,
380 triples,
381 grouped_triples,
382 fetched_values_map,
383 historical_snapshot,
384 highest_priority_shape,
385 object_shape,
386 highest_priority_class
387 )
388 if "orderedBy" in current_prop_config and not current_prop_config.get("isVirtual", False):
389 if display_name_simple not in grouped_triples:
390 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")}
391 grouped_triples[display_name_simple]["is_draggable"] = True
392 grouped_triples[display_name_simple]["ordered_by"] = current_prop_config.get("orderedBy")
393 process_ordering(
394 subject,
395 current_prop_config,
396 current_prop_config.get("orderedBy"),
397 grouped_triples,
398 display_name_simple,
399 fetched_values_map,
400 historical_snapshot,
401 highest_priority_shape
402 )
403 if "intermediateRelation" in current_prop_config:
404 if display_name_simple not in grouped_triples:
405 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")}
406 grouped_triples[display_name_simple]["intermediateRelation"] = current_prop_config["intermediateRelation"]
407 else:
408 # Property without specific configuration - add to relevant_properties
409 # Don't process properties without configuration (they are not virtual in this case)
410 relevant_properties.add(prop_uri)
411 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class)
412 else:
413 # No display rules or no matching rule - add all properties to relevant_properties
414 relevant_properties.add(prop_uri)
415 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class)
417 grouped_triples = OrderedDict(grouped_triples)
418 return grouped_triples, relevant_properties
421def process_display_rule(
422 display_name,
423 prop_uri,
424 rule,
425 subject,
426 triples,
427 grouped_triples,
428 fetched_values_map,
429 historical_snapshot=None,
430 subject_shape=None,
431 object_shape=None,
432 subject_class=None,
433):
434 if display_name not in grouped_triples:
435 grouped_triples[display_name] = {
436 "property": prop_uri,
437 "triples": [],
438 "subjectClass": subject_class,
439 "subjectShape": subject_shape,
440 "objectShape": object_shape,
441 "intermediateRelation": rule.get("intermediateRelation"),
442 }
443 for triple in triples:
444 if str(triple[1]) == prop_uri:
445 if rule.get("fetchValueFromQuery"):
446 if historical_snapshot:
447 result, external_entity = execute_historical_query(
448 rule["fetchValueFromQuery"],
449 subject,
450 triple[2],
451 historical_snapshot,
452 )
453 else:
454 result, external_entity = execute_sparql_query(
455 rule["fetchValueFromQuery"], subject, triple[2]
456 )
457 if result:
458 fetched_values_map[str(result)] = str(triple[2])
459 new_triple = (str(triple[0]), str(triple[1]), str(result))
460 object_uri = str(triple[2])
461 new_triple_data = {
462 "triple": new_triple,
463 "external_entity": external_entity,
464 "object": object_uri,
465 "subjectClass": subject_class,
466 "subjectShape": subject_shape,
467 "objectShape": object_shape,
468 }
469 grouped_triples[display_name]["triples"].append(new_triple_data)
470 else:
471 if str(triple[1]) == 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type':
472 from heritrace.utils.shacl_utils import determine_shape_for_classes
473 object_class_shape = determine_shape_for_classes([triple[2]])
474 result = get_custom_filter().human_readable_class((triple[2], object_class_shape))
475 else:
476 result = triple[2]
478 object_uri = str(triple[2])
480 new_triple_data = {
481 "triple": (str(triple[0]), str(triple[1]), result),
482 "object": object_uri,
483 "subjectClass": subject_class,
484 "subjectShape": subject_shape,
485 "objectShape": object_shape,
486 }
487 grouped_triples[display_name]["triples"].append(new_triple_data)
490def process_virtual_property_display(
491 display_name: str,
492 prop_config: dict,
493 subject: URIRef,
494 grouped_triples: OrderedDict,
495 fetched_values_map: dict,
496 historical_snapshot: Optional[Graph] = None,
497 subject_shape: Optional[str] = None,
498 subject_class: Optional[str] = None
499):
500 """Process virtual properties by querying for entities that reference the current entity."""
502 implementation = prop_config.get("implementedVia", {})
503 field_overrides = implementation.get("fieldOverrides", {})
504 target = implementation.get("target", {})
505 target_class = target.get("class")
507 # Find which field should reference the current entity
508 reference_field = None
509 for field_uri, override in field_overrides.items():
510 if override.get("value") == "${currentEntity}":
511 reference_field = field_uri
512 break
514 if not reference_field:
515 return
517 decoded_subject = unquote(str(subject))
519 # Query for entities that reference the current entity via the reference field
520 query = f"""
521 SELECT DISTINCT ?entity
522 WHERE {{
523 ?entity <{reference_field}> <{decoded_subject}> .
524 """
526 if target_class:
527 query += f"""
528 ?entity a <{target_class}> .
529 """
531 query += """
532 }
533 """
535 if historical_snapshot:
536 # Execute query on historical snapshot
537 results = list(historical_snapshot.query(query))
538 entity_uris = [str(row[0]) for row in results]
539 else:
540 # Execute query on live triplestore
541 sparql = get_sparql()
542 sparql.setQuery(query)
543 sparql.setReturnFormat(JSON)
544 results = sparql.query().convert().get("results", {}).get("bindings", [])
545 entity_uris = [res["entity"]["value"] for res in results]
547 # Now fetch display values for these entities if fetchValueFromQuery is configured
549 if prop_config.get("fetchValueFromQuery") and entity_uris:
551 if display_name not in grouped_triples:
552 grouped_triples[display_name] = {
553 "property": display_name, # Use display name as identifier for virtual properties
554 "triples": [],
555 "subjectClass": subject_class,
556 "subjectShape": subject_shape,
557 "objectShape": None, # Should be None for virtual properties to match key format
558 "is_virtual": True
559 }
561 for entity_uri in entity_uris:
562 # Execute the fetch query for each entity
563 if historical_snapshot:
564 result, external_entity = execute_historical_query(
565 prop_config["fetchValueFromQuery"],
566 subject,
567 URIRef(entity_uri),
568 historical_snapshot
569 )
570 else:
571 result, external_entity = execute_sparql_query(
572 prop_config["fetchValueFromQuery"],
573 str(subject),
574 entity_uri
575 )
577 if result:
578 fetched_values_map[str(result)] = entity_uri
579 new_triple_data = {
580 "triple": (str(subject), display_name, str(result)),
581 "external_entity": external_entity,
582 "object": entity_uri,
583 "subjectClass": subject_class,
584 "subjectShape": subject_shape,
585 "objectShape": target.get("shape"),
586 "is_virtual": True
587 }
588 grouped_triples[display_name]["triples"].append(new_triple_data)
589 else:
590 # Even if no entities are found, we should still create the entry for virtual properties
591 # so they can be added via the interface
593 if display_name not in grouped_triples:
594 grouped_triples[display_name] = {
595 "property": display_name, # Use display name as identifier for virtual properties
596 "triples": [],
597 "subjectClass": subject_class,
598 "subjectShape": subject_shape,
599 "objectShape": None, # Should be None for virtual properties to match key format
600 "is_virtual": True
601 }
604def execute_sparql_query(query: str, subject: str, value: str) -> Tuple[str, str]:
605 sparql = get_sparql()
607 decoded_subject = unquote(subject)
608 decoded_value = unquote(value)
609 query = query.replace("[[subject]]", f"<{decoded_subject}>")
610 query = query.replace("[[value]]", f"<{decoded_value}>")
611 sparql.setQuery(query)
612 sparql.setReturnFormat(JSON)
613 results = sparql.query().convert().get("results", {}).get("bindings", [])
614 if results:
615 parsed_query = parseQuery(query)
616 algebra_query = translateQuery(parsed_query).algebra
617 variable_order = algebra_query["PV"]
618 result = results[0]
619 values = [
620 result.get(str(var_name), {}).get("value", None)
621 for var_name in variable_order
622 ]
623 first_value = values[0] if len(values) > 0 else None
624 second_value = values[1] if len(values) > 1 else None
625 return (first_value, second_value)
626 return None, None
629def process_ordering(
630 subject,
631 prop,
632 order_property,
633 grouped_triples,
634 display_name,
635 fetched_values_map,
636 historical_snapshot: Dataset | Graph | None = None,
637):
638 def get_ordered_sequence(order_results):
639 order_map = {}
640 for res in order_results:
641 if isinstance(res, dict): # For live triplestore results
642 ordered_entity = res["orderedEntity"]["value"]
643 next_value = res["nextValue"]["value"]
644 else: # For historical snapshot results
645 ordered_entity = str(res[0])
646 next_value = str(res[1])
648 order_map[str(ordered_entity)] = (
649 None if str(next_value) == "NONE" else str(next_value)
650 )
652 all_sequences = []
653 start_elements = set(order_map.keys()) - set(order_map.values())
654 while start_elements:
655 sequence = []
656 current_element = start_elements.pop()
657 while current_element in order_map:
658 sequence.append(current_element)
659 current_element = order_map[current_element]
660 all_sequences.append(sequence)
661 return all_sequences
663 decoded_subject = unquote(subject)
665 sparql = get_sparql()
667 order_query = f"""
668 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue)
669 WHERE {{
670 <{decoded_subject}> <{prop['property']}> ?orderedEntity.
671 OPTIONAL {{
672 ?orderedEntity <{order_property}> ?next.
673 }}
674 }}
675 """
676 if historical_snapshot:
677 order_results = list(historical_snapshot.query(order_query))
678 else:
679 sparql.setQuery(order_query)
680 sparql.setReturnFormat(JSON)
681 order_results = sparql.query().convert().get("results", {}).get("bindings", [])
683 order_sequences = get_ordered_sequence(order_results)
684 for sequence in order_sequences:
685 grouped_triples[display_name]["triples"].sort(
686 key=lambda x: (
687 sequence.index(
688 fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
689 )
690 if fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2]))
691 in sequence
692 else float("inf")
693 )
694 )
697def process_default_property(prop_uri, triples, grouped_triples, subject_shape=None, subject_class=None):
698 display_name = prop_uri
699 grouped_triples[display_name] = {
700 "property": prop_uri,
701 "triples": [],
702 "subjectClass": subject_class,
703 "subjectShape": subject_shape,
704 "objectShape": None
705 }
706 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri]
707 for triple in triples_for_prop:
708 new_triple_data = {
709 "triple": (str(triple[0]), str(triple[1]), str(triple[2])),
710 "object": str(triple[2]),
711 "subjectClass": subject_class,
712 "subjectShape": subject_shape,
713 "objectShape": None,
714 }
715 grouped_triples[display_name]["triples"].append(new_triple_data)
718def execute_historical_query(
719 query: str, subject: str, value: str, historical_snapshot: Graph
720) -> Tuple[str, str]:
721 decoded_subject = unquote(subject)
722 decoded_value = unquote(value)
723 query = query.replace("[[subject]]", f"<{decoded_subject}>")
724 query = query.replace("[[value]]", f"<{decoded_value}>")
725 results = historical_snapshot.query(query)
726 if results:
727 for result in results:
728 if len(result) == 2:
729 return (str(result[0]), str(result[1]))
730 return None, None
733def get_property_order_from_rules(highest_priority_class: str, shape_uri: str = None):
734 """
735 Extract ordered list of properties from display rules for given entity class and optionally a shape.
737 Args:
738 highest_priority_class: The highest priority class for the entity
739 shape_uri: Optional shape URI for the entity
741 Returns:
742 List of property URIs in the order specified by display rules
743 """
744 if not highest_priority_class:
745 return []
747 rule = find_matching_rule(highest_priority_class, shape_uri)
748 if not rule:
749 return []
751 ordered_properties = []
752 for prop in rule.get("displayProperties", []):
753 if not isinstance(prop, dict):
754 continue
755 if prop.get("isVirtual"):
756 continue # Virtual properties don't have RDF predicates
757 if "property" in prop:
758 ordered_properties.append(prop["property"])
760 return ordered_properties
763def get_predicate_ordering_info(predicate_uri: str, highest_priority_class: str, entity_shape: str = None) -> Optional[str]:
764 """
765 Check if a predicate is ordered and return its ordering property.
767 Args:
768 predicate_uri: URI of the predicate to check
769 highest_priority_class: The highest priority class for the subject entity
770 entity_shape: Optional shape for the subject entity
772 Returns:
773 The ordering property URI if the predicate is ordered, None otherwise
774 """
775 display_rules = get_display_rules()
776 if not display_rules:
777 return None
779 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules)
780 if not rule:
781 return None
783 for prop in rule.get("displayProperties", []):
784 if not isinstance(prop, dict):
785 continue
786 if prop.get("isVirtual"):
787 continue # Virtual properties don't have RDF predicates or ordering
788 if prop.get("property") == predicate_uri:
789 return prop.get("orderedBy")
791 return None
794def get_shape_order_from_display_rules(highest_priority_class: str, entity_shape: str, predicate_uri: str) -> list:
795 """
796 Get the ordered list of shapes for a specific predicate from display rules.
798 Args:
799 highest_priority_class: The highest priority class for the entity
800 entity_shape: The shape for the subject entity
801 predicate_uri: The predicate URI to get shape ordering for
803 Returns:
804 List of shape URIs in the order specified in displayRules, or empty list if no rules found
805 """
806 display_rules = get_display_rules()
807 if not display_rules:
808 return []
810 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules)
811 if not rule or "displayProperties" not in rule:
812 return []
814 for prop_config in rule["displayProperties"]:
815 if not isinstance(prop_config, dict):
816 continue
817 if prop_config.get("isVirtual"):
818 continue # Virtual properties don't have RDF predicates or display rules
819 if "property" not in prop_config:
820 continue # Defensive check for malformed configuration
821 if prop_config["property"] == predicate_uri:
822 if "displayRules" in prop_config:
823 return [display_rule.get("shape") for display_rule in prop_config["displayRules"]
824 if display_rule.get("shape")]
826 return []
829def get_similarity_properties(entity_key: Tuple[str, str]) -> Optional[List[Union[str, Dict[str, List[str]]]]]:
830 """Gets the similarity properties configuration for a given entity key.
832 This configuration specifies which properties should be used for similarity matching
833 using a list-based structure supporting OR logic between elements and
834 nested AND logic within elements.
836 Example structures:
837 - ['prop1', 'prop2'] # prop1 OR prop2
838 - [{'and': ['prop3', 'prop4']}] # prop3 AND prop4
839 - ['prop1', {'and': ['prop2', 'prop3']}] # prop1 OR (prop2 AND prop3)
841 Args:
842 entity_key: A tuple (class_uri, shape_uri)
844 Returns:
845 A list where each element is either a property URI string or a dictionary
846 {'and': [list_of_property_uris]}, representing the boolean logic.
847 Returns None if no configuration is found or if the structure is invalid.
848 """
849 class_uri = entity_key[0]
850 shape_uri = entity_key[1]
852 # Find the matching rule
853 rule = find_matching_rule(class_uri, shape_uri)
854 if not rule:
855 return None
857 similarity_props = rule.get("similarity_properties")
859 if not similarity_props or not isinstance(similarity_props, list):
860 return None
862 # Validate each element in the list.
863 validated_props = []
864 for item in similarity_props:
865 if isinstance(item, str):
866 validated_props.append(item)
867 elif isinstance(item, dict) and len(item) == 1 and "and" in item:
868 and_list = item["and"]
869 if isinstance(and_list, list) and and_list and all(isinstance(p, str) for p in and_list):
870 validated_props.append(item)
871 else:
872 print(
873 f"Warning: Invalid 'and' group in similarity_properties" +
874 (f" for class {class_uri}" if class_uri else "") +
875 (f" with shape {shape_uri}" if shape_uri else "") +
876 f". Expected {{'and': ['prop_uri', ...]}} with a non-empty list of strings."
877 )
878 return None # Invalid 'and' group structure
879 else:
880 print(
881 f"Warning: Invalid item format in similarity_properties list" +
882 (f" for class {class_uri}" if class_uri else "") +
883 (f" with shape {shape_uri}" if shape_uri else "") +
884 f". Expected a property URI string or {{'and': [...]}} dict."
885 )
886 return None # Invalid item type
888 return validated_props if validated_props else None # Return validated list or None if empty after validation