Coverage for heritrace / utils / display_rules_utils.py: 90%

374 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-21 12:56 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from collections import OrderedDict 

6from typing import Dict, List, Optional, Tuple, Union 

7from urllib.parse import unquote 

8 

9from heritrace.extensions import (get_custom_filter, get_display_rules, 

10 get_form_fields, get_sparql) 

11from rdflib import Dataset, Graph, Literal, URIRef 

12from rdflib.plugins.sparql.algebra import translateQuery 

13from rdflib.plugins.sparql.parser import parseQuery 

14from SPARQLWrapper import JSON 

15 

16 

17display_rules = get_display_rules() 

18 

19 

20def find_matching_rule(class_uri=None, shape_uri=None, rules=None): 

21 """ 

22 Find the most appropriate rule for a given class and/or shape. 

23 At least one of class_uri or shape_uri must be provided. 

24  

25 Args: 

26 class_uri: Optional URI of the class 

27 shape_uri: Optional URI of the shape 

28 rules: Optional list of rules to search in, defaults to global display_rules 

29  

30 Returns: 

31 The matching rule or None if no match is found 

32 """ 

33 if not rules: 

34 rules = get_display_rules() 

35 if not rules: 

36 return None 

37 

38 # Initialize variables to track potential matches 

39 class_match = None 

40 shape_match = None 

41 highest_priority = float('inf') 

42 

43 # Scan all rules to find the best match based on priority 

44 for rule in rules: 

45 rule_priority = rule.get("priority", 0) 

46 

47 # Case 1: Both class and shape match (exact match) 

48 if class_uri and shape_uri and \ 

49 "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \ 

50 "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri): 

51 # Exact match always takes highest precedence 

52 return rule 

53 

54 # Case 2: Only class matches 

55 elif class_uri and "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \ 

56 "shape" not in rule["target"]: 

57 if class_match is None or rule_priority < highest_priority: 

58 class_match = rule 

59 highest_priority = rule_priority 

60 

61 # Case 3: Only shape matches 

62 elif shape_uri and "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri) and \ 

63 "class" not in rule["target"]: 

64 if shape_match is None or rule_priority < highest_priority: 

65 shape_match = rule 

66 highest_priority = rule_priority 

67 

68 # Return the best match based on priority 

69 # Shape rules typically have higher specificity, so prefer them if they have equal priority 

70 if shape_match and (class_match is None or 

71 shape_match.get("priority", 0) <= class_match.get("priority", 0)): 

72 return shape_match 

73 elif class_match: 

74 return class_match 

75 

76 return None 

77 

78 

79def get_class_priority(entity_key): 

80 """ 

81 Returns the priority of a specific entity key (class_uri, shape_uri). 

82 Calculates the priority directly from the display rules. 

83 Classes without defined rules receive the lowest priority (highest number). 

84  

85 Args: 

86 entity_key: A tuple (class_uri, shape_uri) 

87 """ 

88 class_uri = entity_key[0] 

89 shape_uri = entity_key[1] 

90 

91 rule = find_matching_rule(class_uri, shape_uri) 

92 return rule.get("priority", 0) if rule else float('inf') 

93 

94 

95def is_entity_type_visible(entity_key): 

96 """ 

97 Determines if an entity type should be displayed. 

98  

99 Args: 

100 entity_key: A tuple (class_uri, shape_uri) 

101 """ 

102 class_uri = entity_key[0] 

103 shape_uri = entity_key[1] 

104 

105 rule = find_matching_rule(class_uri, shape_uri) 

106 return rule.get("shouldBeDisplayed", True) if rule else True 

107 

108 

109def get_sortable_properties(entity_key: Tuple[str, str]) -> List[Dict[str, str]]: 

110 """ 

111 Gets the sortable properties from display rules for an entity type and/or shape. 

112 Infers the sorting type from form_fields_cache. 

113 

114 Args: 

115 entity_key: A tuple (class_uri, shape_uri) 

116 

117 Returns: 

118 List of dictionaries with sorting information 

119 """ 

120 display_rules = get_display_rules() 

121 if not display_rules: 

122 return [] 

123 

124 form_fields = get_form_fields() 

125 

126 class_uri = entity_key[0] 

127 shape_uri = entity_key[1] 

128 

129 rule = find_matching_rule(class_uri, shape_uri, display_rules) 

130 if not rule or "sortableBy" not in rule: 

131 return [] 

132 

133 sort_props = [] 

134 for sort_config in rule["sortableBy"]: 

135 prop = sort_config.copy() 

136 

137 for display_prop in rule["displayProperties"]: 

138 if display_prop["property"] == prop["property"]: 

139 if "displayRules" in display_prop: 

140 prop["displayName"] = display_prop["displayRules"][0][ 

141 "displayName" 

142 ] 

143 else: 

144 prop["displayName"] = display_prop.get( 

145 "displayName", prop["property"] 

146 ) 

147 break 

148 

149 # Default to string sorting 

150 prop["sortType"] = "string" 

151 

152 # Try to determine the sort type from form fields 

153 if form_fields: 

154 # First try with the exact entity_key (class, shape) 

155 if entity_key in form_fields and prop["property"] in form_fields[entity_key]: 

156 field_info = form_fields[entity_key][prop["property"]][0] # Take the first field definition 

157 prop["sortType"] = determine_sort_type(field_info) 

158 

159 sort_props.append(prop) 

160 

161 return sort_props 

162 

163 

164def determine_sort_type(field_info): 

165 """Helper function to determine sort type from field info.""" 

166 # If there's a shape, it's a reference to an entity (sort by label) 

167 if field_info.get("nodeShape"): 

168 return "string" 

169 # Otherwise look at the datatypes 

170 elif field_info.get("datatypes"): 

171 datatype = str(field_info["datatypes"][0]).lower() 

172 if any(t in datatype for t in ["date", "time"]): 

173 return "date" 

174 elif any( 

175 t in datatype 

176 for t in ["int", "float", "decimal", "double", "number"] 

177 ): 

178 return "number" 

179 elif "boolean" in datatype: 

180 return "boolean" 

181 # Default to string 

182 return "string" 

183 

184 

185def get_highest_priority_class(subject_classes): 

186 """ 

187 Find the highest priority class from the given list of classes. 

188  

189 Args: 

190 subject_classes: List of class URIs 

191  

192 Returns: 

193 The highest priority class or None if no classes are provided 

194 """ 

195 from heritrace.utils.shacl_utils import determine_shape_for_classes 

196 

197 if not subject_classes: 

198 return None 

199 

200 highest_priority = float('inf') 

201 highest_priority_class = None 

202 

203 for class_uri in subject_classes: 

204 class_uri = str(class_uri) 

205 shape = determine_shape_for_classes([class_uri]) 

206 entity_key = (class_uri, shape) 

207 priority = get_class_priority(entity_key) 

208 if priority < highest_priority: 

209 highest_priority = priority 

210 highest_priority_class = class_uri 

211 

212 if highest_priority_class is None and subject_classes: 

213 highest_priority_class = str(subject_classes[0]) 

214 

215 return highest_priority_class 

216 

217 

218def get_grouped_triples( 

219 subject: URIRef, 

220 triples: List[Tuple[URIRef, URIRef, URIRef|Literal]], 

221 valid_predicates_info: List[str], 

222 historical_snapshot: Optional[Graph] = None, 

223 highest_priority_class: Optional[str] = None, 

224 highest_priority_shape: Optional[str] = None 

225) -> Tuple[OrderedDict, set, dict]: 

226 """ 

227 This function groups the triples based on the display rules.  

228 It also fetches the values for the properties that are configured to be fetched from the query. 

229  

230 Args: 

231 subject: The subject URI 

232 triples: List of triples for the subject 

233 valid_predicates_info: List of valid predicates for the subject 

234 historical_snapshot: Optional historical snapshot graph 

235 highest_priority_class: The highest priority class URI for the subject 

236 highest_priority_shape: The highest priority shape URI for the subject 

237  

238 Returns: 

239 Tuple of grouped triples, relevant properties, and fetched values map 

240  

241 Note: 

242 relevant_properties contains all properties that should be considered  

243 "relevant" for UI operations (adding/deleting). This includes: 

244 - Properties configured in display rules when rules exist and match 

245 - ALL valid properties when no display rules exist or no rules match 

246 This ensures users can always interact with entities even without display rules. 

247 """ 

248 display_rules = get_display_rules() 

249 form_fields = get_form_fields() 

250 

251 grouped_triples = OrderedDict() 

252 relevant_properties = set() 

253 fetched_values_map = dict() 

254 

255 matching_rule = find_matching_rule(highest_priority_class, highest_priority_shape, display_rules) 

256 matching_form_field = form_fields.get((highest_priority_class, highest_priority_shape)) 

257 

258 ordered_properties = [] 

259 if display_rules and matching_rule: 

260 for prop_config in matching_rule.get("displayProperties", []): 

261 if prop_config.get("isVirtual"): 

262 prop_uri = prop_config.get("displayName") 

263 else: 

264 prop_uri = prop_config.get("property") 

265 if prop_uri and prop_uri not in ordered_properties: 

266 ordered_properties.append(prop_uri) 

267 

268 for prop_uri in valid_predicates_info: 

269 if prop_uri not in ordered_properties: 

270 ordered_properties.append(prop_uri) 

271 

272 for prop_uri in ordered_properties: 

273 current_prop_config = None 

274 

275 if display_rules and matching_rule: 

276 for prop_config in matching_rule.get("displayProperties", []): 

277 config_identifier = prop_config.get("displayName") if prop_config.get("isVirtual") else prop_config.get("property") 

278 if config_identifier == prop_uri: 

279 current_prop_config = prop_config 

280 break 

281 

282 current_form_field = matching_form_field.get(prop_uri) if matching_form_field else None 

283 

284 if current_prop_config: 

285 if "displayRules" in current_prop_config: 

286 is_ordered = "orderedBy" in current_prop_config 

287 order_property = current_prop_config.get("orderedBy") 

288 

289 for display_rule_nested in current_prop_config["displayRules"]: 

290 display_name_nested = display_rule_nested.get( 

291 "displayName", prop_uri 

292 ) 

293 relevant_properties.add(prop_uri) 

294 object_shape = display_rule_nested.get("shape") 

295 if current_prop_config.get("isVirtual"): 

296 process_virtual_property_display( 

297 display_name_nested, 

298 current_prop_config, 

299 subject, 

300 grouped_triples, 

301 fetched_values_map, 

302 historical_snapshot, 

303 highest_priority_shape, 

304 highest_priority_class 

305 ) 

306 else: 

307 process_display_rule( 

308 display_name_nested, 

309 prop_uri, 

310 display_rule_nested, 

311 subject, 

312 triples, 

313 grouped_triples, 

314 fetched_values_map, 

315 historical_snapshot, 

316 highest_priority_shape, 

317 object_shape, 

318 highest_priority_class 

319 ) 

320 if is_ordered and not current_prop_config.get("isVirtual", False): 

321 grouped_triples[display_name_nested]["is_draggable"] = True 

322 grouped_triples[display_name_nested]["ordered_by"] = order_property 

323 process_ordering( 

324 subject, 

325 current_prop_config, 

326 order_property, 

327 grouped_triples, 

328 display_name_nested, 

329 fetched_values_map, 

330 historical_snapshot, 

331 ) 

332 

333 # Ensure the grouped_triples entry exists 

334 if display_name_nested not in grouped_triples: 

335 grouped_triples[display_name_nested] = { 

336 "property": prop_uri, 

337 "triples": [], 

338 "subjectClass": highest_priority_class, 

339 "subjectShape": highest_priority_shape, 

340 "objectShape": display_rule_nested.get("shape") 

341 } 

342 

343 if "intermediateRelation" in display_rule_nested or "intermediateRelation" in current_prop_config: 

344 # Set intermediateRelation from the appropriate source 

345 if "intermediateRelation" in display_rule_nested: 

346 grouped_triples[display_name_nested]["intermediateRelation"] = display_rule_nested["intermediateRelation"] 

347 else: # Must be in current_prop_config based on the if condition 

348 grouped_triples[display_name_nested]["intermediateRelation"] = current_prop_config["intermediateRelation"] 

349 

350 else: 

351 display_name_simple = current_prop_config.get("displayName", prop_uri) 

352 # Only add non-virtual properties to relevant_properties 

353 # Virtual properties are handled separately in entity.py 

354 if not current_prop_config.get("isVirtual"): 

355 relevant_properties.add(prop_uri) 

356 

357 object_shape = None 

358 if current_form_field: 

359 for form_field in current_form_field: 

360 object_shape = form_field.get("nodeShape") 

361 break 

362 

363 if current_prop_config.get("isVirtual"): 

364 process_virtual_property_display( 

365 display_name_simple, 

366 current_prop_config, 

367 subject, 

368 grouped_triples, 

369 fetched_values_map, 

370 historical_snapshot, 

371 highest_priority_shape, 

372 highest_priority_class 

373 ) 

374 else: 

375 process_display_rule( 

376 display_name_simple, 

377 prop_uri, 

378 current_prop_config, 

379 subject, 

380 triples, 

381 grouped_triples, 

382 fetched_values_map, 

383 historical_snapshot, 

384 highest_priority_shape, 

385 object_shape, 

386 highest_priority_class 

387 ) 

388 if "orderedBy" in current_prop_config and not current_prop_config.get("isVirtual", False): 

389 if display_name_simple not in grouped_triples: 

390 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")} 

391 grouped_triples[display_name_simple]["is_draggable"] = True 

392 grouped_triples[display_name_simple]["ordered_by"] = current_prop_config.get("orderedBy") 

393 process_ordering( 

394 subject, 

395 current_prop_config, 

396 current_prop_config.get("orderedBy"), 

397 grouped_triples, 

398 display_name_simple, 

399 fetched_values_map, 

400 historical_snapshot, 

401 highest_priority_shape 

402 ) 

403 if "intermediateRelation" in current_prop_config: 

404 if display_name_simple not in grouped_triples: 

405 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")} 

406 grouped_triples[display_name_simple]["intermediateRelation"] = current_prop_config["intermediateRelation"] 

407 else: 

408 # Property without specific configuration - add to relevant_properties 

409 # Don't process properties without configuration (they are not virtual in this case) 

410 relevant_properties.add(prop_uri) 

411 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class) 

412 else: 

413 # No display rules or no matching rule - add all properties to relevant_properties 

414 relevant_properties.add(prop_uri) 

415 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class) 

416 

417 grouped_triples = OrderedDict(grouped_triples) 

418 return grouped_triples, relevant_properties 

419 

420 

421def process_display_rule( 

422 display_name, 

423 prop_uri, 

424 rule, 

425 subject, 

426 triples, 

427 grouped_triples, 

428 fetched_values_map, 

429 historical_snapshot=None, 

430 subject_shape=None, 

431 object_shape=None, 

432 subject_class=None, 

433): 

434 if display_name not in grouped_triples: 

435 grouped_triples[display_name] = { 

436 "property": prop_uri, 

437 "triples": [], 

438 "subjectClass": subject_class, 

439 "subjectShape": subject_shape, 

440 "objectShape": object_shape, 

441 "intermediateRelation": rule.get("intermediateRelation"), 

442 } 

443 for triple in triples: 

444 if str(triple[1]) == prop_uri: 

445 if rule.get("fetchValueFromQuery"): 

446 if historical_snapshot: 

447 result, external_entity = execute_historical_query( 

448 rule["fetchValueFromQuery"], 

449 subject, 

450 triple[2], 

451 historical_snapshot, 

452 ) 

453 else: 

454 result, external_entity = execute_sparql_query( 

455 rule["fetchValueFromQuery"], subject, triple[2] 

456 ) 

457 if result: 

458 fetched_values_map[str(result)] = str(triple[2]) 

459 new_triple = (str(triple[0]), str(triple[1]), str(result)) 

460 object_uri = str(triple[2]) 

461 new_triple_data = { 

462 "triple": new_triple, 

463 "external_entity": external_entity, 

464 "object": object_uri, 

465 "subjectClass": subject_class, 

466 "subjectShape": subject_shape, 

467 "objectShape": object_shape, 

468 } 

469 grouped_triples[display_name]["triples"].append(new_triple_data) 

470 else: 

471 if str(triple[1]) == 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type': 

472 from heritrace.utils.shacl_utils import determine_shape_for_classes 

473 object_class_shape = determine_shape_for_classes([triple[2]]) 

474 result = get_custom_filter().human_readable_class((triple[2], object_class_shape)) 

475 else: 

476 result = triple[2] 

477 

478 object_uri = str(triple[2]) 

479 

480 new_triple_data = { 

481 "triple": (str(triple[0]), str(triple[1]), result), 

482 "object": object_uri, 

483 "subjectClass": subject_class, 

484 "subjectShape": subject_shape, 

485 "objectShape": object_shape, 

486 } 

487 grouped_triples[display_name]["triples"].append(new_triple_data) 

488 

489 

490def process_virtual_property_display( 

491 display_name: str, 

492 prop_config: dict, 

493 subject: URIRef, 

494 grouped_triples: OrderedDict, 

495 fetched_values_map: dict, 

496 historical_snapshot: Optional[Graph] = None, 

497 subject_shape: Optional[str] = None, 

498 subject_class: Optional[str] = None 

499): 

500 """Process virtual properties by querying for entities that reference the current entity.""" 

501 

502 implementation = prop_config.get("implementedVia", {}) 

503 field_overrides = implementation.get("fieldOverrides", {}) 

504 target = implementation.get("target", {}) 

505 target_class = target.get("class") 

506 

507 # Find which field should reference the current entity 

508 reference_field = None 

509 for field_uri, override in field_overrides.items(): 

510 if override.get("value") == "${currentEntity}": 

511 reference_field = field_uri 

512 break 

513 

514 if not reference_field: 

515 return 

516 

517 decoded_subject = unquote(str(subject)) 

518 

519 # Query for entities that reference the current entity via the reference field 

520 query = f""" 

521 SELECT DISTINCT ?entity 

522 WHERE {{ 

523 ?entity <{reference_field}> <{decoded_subject}> . 

524 """ 

525 

526 if target_class: 

527 query += f""" 

528 ?entity a <{target_class}> . 

529 """ 

530 

531 query += """ 

532 } 

533 """ 

534 

535 if historical_snapshot: 

536 # Execute query on historical snapshot 

537 results = list(historical_snapshot.query(query)) 

538 entity_uris = [str(row[0]) for row in results] 

539 else: 

540 # Execute query on live triplestore 

541 sparql = get_sparql() 

542 sparql.setQuery(query) 

543 sparql.setReturnFormat(JSON) 

544 results = sparql.query().convert().get("results", {}).get("bindings", []) 

545 entity_uris = [res["entity"]["value"] for res in results] 

546 

547 # Now fetch display values for these entities if fetchValueFromQuery is configured 

548 

549 if prop_config.get("fetchValueFromQuery") and entity_uris: 

550 

551 if display_name not in grouped_triples: 

552 grouped_triples[display_name] = { 

553 "property": display_name, # Use display name as identifier for virtual properties 

554 "triples": [], 

555 "subjectClass": subject_class, 

556 "subjectShape": subject_shape, 

557 "objectShape": None, # Should be None for virtual properties to match key format 

558 "is_virtual": True 

559 } 

560 

561 for entity_uri in entity_uris: 

562 # Execute the fetch query for each entity 

563 if historical_snapshot: 

564 result, external_entity = execute_historical_query( 

565 prop_config["fetchValueFromQuery"], 

566 subject, 

567 URIRef(entity_uri), 

568 historical_snapshot 

569 ) 

570 else: 

571 result, external_entity = execute_sparql_query( 

572 prop_config["fetchValueFromQuery"], 

573 str(subject), 

574 entity_uri 

575 ) 

576 

577 if result: 

578 fetched_values_map[str(result)] = entity_uri 

579 new_triple_data = { 

580 "triple": (str(subject), display_name, str(result)), 

581 "external_entity": external_entity, 

582 "object": entity_uri, 

583 "subjectClass": subject_class, 

584 "subjectShape": subject_shape, 

585 "objectShape": target.get("shape"), 

586 "is_virtual": True 

587 } 

588 grouped_triples[display_name]["triples"].append(new_triple_data) 

589 else: 

590 # Even if no entities are found, we should still create the entry for virtual properties 

591 # so they can be added via the interface 

592 

593 if display_name not in grouped_triples: 

594 grouped_triples[display_name] = { 

595 "property": display_name, # Use display name as identifier for virtual properties 

596 "triples": [], 

597 "subjectClass": subject_class, 

598 "subjectShape": subject_shape, 

599 "objectShape": None, # Should be None for virtual properties to match key format 

600 "is_virtual": True 

601 } 

602 

603 

604def execute_sparql_query(query: str, subject: str, value: str) -> Tuple[str, str]: 

605 sparql = get_sparql() 

606 

607 decoded_subject = unquote(subject) 

608 decoded_value = unquote(value) 

609 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

610 query = query.replace("[[value]]", f"<{decoded_value}>") 

611 sparql.setQuery(query) 

612 sparql.setReturnFormat(JSON) 

613 results = sparql.query().convert().get("results", {}).get("bindings", []) 

614 if results: 

615 parsed_query = parseQuery(query) 

616 algebra_query = translateQuery(parsed_query).algebra 

617 variable_order = algebra_query["PV"] 

618 result = results[0] 

619 values = [ 

620 result.get(str(var_name), {}).get("value", None) 

621 for var_name in variable_order 

622 ] 

623 first_value = values[0] if len(values) > 0 else None 

624 second_value = values[1] if len(values) > 1 else None 

625 return (first_value, second_value) 

626 return None, None 

627 

628 

629def process_ordering( 

630 subject, 

631 prop, 

632 order_property, 

633 grouped_triples, 

634 display_name, 

635 fetched_values_map, 

636 historical_snapshot: Dataset | Graph | None = None, 

637): 

638 def get_ordered_sequence(order_results): 

639 order_map = {} 

640 for res in order_results: 

641 if isinstance(res, dict): # For live triplestore results 

642 ordered_entity = res["orderedEntity"]["value"] 

643 next_value = res["nextValue"]["value"] 

644 else: # For historical snapshot results 

645 ordered_entity = str(res[0]) 

646 next_value = str(res[1]) 

647 

648 order_map[str(ordered_entity)] = ( 

649 None if str(next_value) == "NONE" else str(next_value) 

650 ) 

651 

652 all_sequences = [] 

653 start_elements = set(order_map.keys()) - set(order_map.values()) 

654 while start_elements: 

655 sequence = [] 

656 current_element = start_elements.pop() 

657 while current_element in order_map: 

658 sequence.append(current_element) 

659 current_element = order_map[current_element] 

660 all_sequences.append(sequence) 

661 return all_sequences 

662 

663 decoded_subject = unquote(subject) 

664 

665 sparql = get_sparql() 

666 

667 order_query = f""" 

668 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue) 

669 WHERE {{ 

670 <{decoded_subject}> <{prop['property']}> ?orderedEntity. 

671 OPTIONAL {{ 

672 ?orderedEntity <{order_property}> ?next. 

673 }} 

674 }} 

675 """ 

676 if historical_snapshot: 

677 order_results = list(historical_snapshot.query(order_query)) 

678 else: 

679 sparql.setQuery(order_query) 

680 sparql.setReturnFormat(JSON) 

681 order_results = sparql.query().convert().get("results", {}).get("bindings", []) 

682 

683 order_sequences = get_ordered_sequence(order_results) 

684 for sequence in order_sequences: 

685 grouped_triples[display_name]["triples"].sort( 

686 key=lambda x: ( 

687 sequence.index( 

688 fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

689 ) 

690 if fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

691 in sequence 

692 else float("inf") 

693 ) 

694 ) 

695 

696 

697def process_default_property(prop_uri, triples, grouped_triples, subject_shape=None, subject_class=None): 

698 display_name = prop_uri 

699 grouped_triples[display_name] = { 

700 "property": prop_uri, 

701 "triples": [], 

702 "subjectClass": subject_class, 

703 "subjectShape": subject_shape, 

704 "objectShape": None 

705 } 

706 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri] 

707 for triple in triples_for_prop: 

708 new_triple_data = { 

709 "triple": (str(triple[0]), str(triple[1]), str(triple[2])), 

710 "object": str(triple[2]), 

711 "subjectClass": subject_class, 

712 "subjectShape": subject_shape, 

713 "objectShape": None, 

714 } 

715 grouped_triples[display_name]["triples"].append(new_triple_data) 

716 

717 

718def execute_historical_query( 

719 query: str, subject: str, value: str, historical_snapshot: Graph 

720) -> Tuple[str, str]: 

721 decoded_subject = unquote(subject) 

722 decoded_value = unquote(value) 

723 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

724 query = query.replace("[[value]]", f"<{decoded_value}>") 

725 results = historical_snapshot.query(query) 

726 if results: 

727 for result in results: 

728 if len(result) == 2: 

729 return (str(result[0]), str(result[1])) 

730 return None, None 

731 

732 

733def get_property_order_from_rules(highest_priority_class: str, shape_uri: str = None): 

734 """ 

735 Extract ordered list of properties from display rules for given entity class and optionally a shape. 

736 

737 Args: 

738 highest_priority_class: The highest priority class for the entity 

739 shape_uri: Optional shape URI for the entity 

740 

741 Returns: 

742 List of property URIs in the order specified by display rules 

743 """ 

744 if not highest_priority_class: 

745 return [] 

746 

747 rule = find_matching_rule(highest_priority_class, shape_uri) 

748 if not rule: 

749 return [] 

750 

751 ordered_properties = [] 

752 for prop in rule.get("displayProperties", []): 

753 if not isinstance(prop, dict): 

754 continue 

755 if prop.get("isVirtual"): 

756 continue # Virtual properties don't have RDF predicates 

757 if "property" in prop: 

758 ordered_properties.append(prop["property"]) 

759 

760 return ordered_properties 

761 

762 

763def get_predicate_ordering_info(predicate_uri: str, highest_priority_class: str, entity_shape: str = None) -> Optional[str]: 

764 """ 

765 Check if a predicate is ordered and return its ordering property. 

766  

767 Args: 

768 predicate_uri: URI of the predicate to check 

769 highest_priority_class: The highest priority class for the subject entity 

770 entity_shape: Optional shape for the subject entity 

771  

772 Returns: 

773 The ordering property URI if the predicate is ordered, None otherwise 

774 """ 

775 display_rules = get_display_rules() 

776 if not display_rules: 

777 return None 

778 

779 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules) 

780 if not rule: 

781 return None 

782 

783 for prop in rule.get("displayProperties", []): 

784 if not isinstance(prop, dict): 

785 continue 

786 if prop.get("isVirtual"): 

787 continue # Virtual properties don't have RDF predicates or ordering 

788 if prop.get("property") == predicate_uri: 

789 return prop.get("orderedBy") 

790 

791 return None 

792 

793 

794def get_shape_order_from_display_rules(highest_priority_class: str, entity_shape: str, predicate_uri: str) -> list: 

795 """ 

796 Get the ordered list of shapes for a specific predicate from display rules. 

797  

798 Args: 

799 highest_priority_class: The highest priority class for the entity 

800 entity_shape: The shape for the subject entity 

801 predicate_uri: The predicate URI to get shape ordering for 

802  

803 Returns: 

804 List of shape URIs in the order specified in displayRules, or empty list if no rules found 

805 """ 

806 display_rules = get_display_rules() 

807 if not display_rules: 

808 return [] 

809 

810 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules) 

811 if not rule or "displayProperties" not in rule: 

812 return [] 

813 

814 for prop_config in rule["displayProperties"]: 

815 if not isinstance(prop_config, dict): 

816 continue 

817 if prop_config.get("isVirtual"): 

818 continue # Virtual properties don't have RDF predicates or display rules 

819 if "property" not in prop_config: 

820 continue # Defensive check for malformed configuration 

821 if prop_config["property"] == predicate_uri: 

822 if "displayRules" in prop_config: 

823 return [display_rule.get("shape") for display_rule in prop_config["displayRules"] 

824 if display_rule.get("shape")] 

825 

826 return [] 

827 

828 

829def get_similarity_properties(entity_key: Tuple[str, str]) -> Optional[List[Union[str, Dict[str, List[str]]]]]: 

830 """Gets the similarity properties configuration for a given entity key. 

831 

832 This configuration specifies which properties should be used for similarity matching 

833 using a list-based structure supporting OR logic between elements and 

834 nested AND logic within elements. 

835 

836 Example structures: 

837 - ['prop1', 'prop2'] # prop1 OR prop2 

838 - [{'and': ['prop3', 'prop4']}] # prop3 AND prop4 

839 - ['prop1', {'and': ['prop2', 'prop3']}] # prop1 OR (prop2 AND prop3) 

840 

841 Args: 

842 entity_key: A tuple (class_uri, shape_uri) 

843 

844 Returns: 

845 A list where each element is either a property URI string or a dictionary 

846 {'and': [list_of_property_uris]}, representing the boolean logic. 

847 Returns None if no configuration is found or if the structure is invalid. 

848 """ 

849 class_uri = entity_key[0] 

850 shape_uri = entity_key[1] 

851 

852 # Find the matching rule 

853 rule = find_matching_rule(class_uri, shape_uri) 

854 if not rule: 

855 return None 

856 

857 similarity_props = rule.get("similarity_properties") 

858 

859 if not similarity_props or not isinstance(similarity_props, list): 

860 return None 

861 

862 # Validate each element in the list. 

863 validated_props = [] 

864 for item in similarity_props: 

865 if isinstance(item, str): 

866 validated_props.append(item) 

867 elif isinstance(item, dict) and len(item) == 1 and "and" in item: 

868 and_list = item["and"] 

869 if isinstance(and_list, list) and and_list and all(isinstance(p, str) for p in and_list): 

870 validated_props.append(item) 

871 else: 

872 print( 

873 f"Warning: Invalid 'and' group in similarity_properties" + 

874 (f" for class {class_uri}" if class_uri else "") + 

875 (f" with shape {shape_uri}" if shape_uri else "") + 

876 f". Expected {{'and': ['prop_uri', ...]}} with a non-empty list of strings." 

877 ) 

878 return None # Invalid 'and' group structure 

879 else: 

880 print( 

881 f"Warning: Invalid item format in similarity_properties list" + 

882 (f" for class {class_uri}" if class_uri else "") + 

883 (f" with shape {shape_uri}" if shape_uri else "") + 

884 f". Expected a property URI string or {{'and': [...]}} dict." 

885 ) 

886 return None # Invalid item type 

887 

888 return validated_props if validated_props else None # Return validated list or None if empty after validation