Coverage for heritrace/utils/display_rules_utils.py: 93%

360 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-10-13 17:12 +0000

1from collections import OrderedDict 

2from typing import Dict, List, Optional, Tuple, Union 

3from urllib.parse import unquote 

4 

5from heritrace.extensions import (get_custom_filter, get_display_rules, 

6 get_form_fields, get_sparql) 

7from rdflib import ConjunctiveGraph, Graph, Literal, URIRef 

8from rdflib.plugins.sparql.algebra import translateQuery 

9from rdflib.plugins.sparql.parser import parseQuery 

10from SPARQLWrapper import JSON 

11 

12 

13display_rules = get_display_rules() 

14 

15 

16def find_matching_rule(class_uri=None, shape_uri=None, rules=None): 

17 """ 

18 Find the most appropriate rule for a given class and/or shape. 

19 At least one of class_uri or shape_uri must be provided. 

20  

21 Args: 

22 class_uri: Optional URI of the class 

23 shape_uri: Optional URI of the shape 

24 rules: Optional list of rules to search in, defaults to global display_rules 

25  

26 Returns: 

27 The matching rule or None if no match is found 

28 """ 

29 if not rules: 

30 rules = get_display_rules() 

31 if not rules: 

32 return None 

33 

34 # Initialize variables to track potential matches 

35 class_match = None 

36 shape_match = None 

37 highest_priority = float('inf') 

38 

39 # Scan all rules to find the best match based on priority 

40 for rule in rules: 

41 rule_priority = rule.get("priority", 0) 

42 

43 # Case 1: Both class and shape match (exact match) 

44 if class_uri and shape_uri and \ 

45 "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \ 

46 "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri): 

47 # Exact match always takes highest precedence 

48 return rule 

49 

50 # Case 2: Only class matches 

51 elif class_uri and "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \ 

52 "shape" not in rule["target"]: 

53 if class_match is None or rule_priority < highest_priority: 

54 class_match = rule 

55 highest_priority = rule_priority 

56 

57 # Case 3: Only shape matches 

58 elif shape_uri and "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri) and \ 

59 "class" not in rule["target"]: 

60 if shape_match is None or rule_priority < highest_priority: 

61 shape_match = rule 

62 highest_priority = rule_priority 

63 

64 # Return the best match based on priority 

65 # Shape rules typically have higher specificity, so prefer them if they have equal priority 

66 if shape_match and (class_match is None or 

67 shape_match.get("priority", 0) <= class_match.get("priority", 0)): 

68 return shape_match 

69 elif class_match: 

70 return class_match 

71 

72 return None 

73 

74 

75def get_class_priority(entity_key): 

76 """ 

77 Returns the priority of a specific entity key (class_uri, shape_uri). 

78 Calculates the priority directly from the display rules. 

79 Classes without defined rules receive the lowest priority (highest number). 

80  

81 Args: 

82 entity_key: A tuple (class_uri, shape_uri) 

83 """ 

84 class_uri = entity_key[0] 

85 shape_uri = entity_key[1] 

86 

87 rule = find_matching_rule(class_uri, shape_uri) 

88 return rule.get("priority", 0) if rule else float('inf') 

89 

90 

91def is_entity_type_visible(entity_key): 

92 """ 

93 Determines if an entity type should be displayed. 

94  

95 Args: 

96 entity_key: A tuple (class_uri, shape_uri) 

97 """ 

98 class_uri = entity_key[0] 

99 shape_uri = entity_key[1] 

100 

101 rule = find_matching_rule(class_uri, shape_uri) 

102 return rule.get("shouldBeDisplayed", True) if rule else True 

103 

104 

105def get_sortable_properties(entity_key: Tuple[str, str]) -> List[Dict[str, str]]: 

106 """ 

107 Gets the sortable properties from display rules for an entity type and/or shape. 

108 Infers the sorting type from form_fields_cache. 

109 

110 Args: 

111 entity_key: A tuple (class_uri, shape_uri) 

112 

113 Returns: 

114 List of dictionaries with sorting information 

115 """ 

116 display_rules = get_display_rules() 

117 if not display_rules: 

118 return [] 

119 

120 form_fields = get_form_fields() 

121 

122 class_uri = entity_key[0] 

123 shape_uri = entity_key[1] 

124 

125 rule = find_matching_rule(class_uri, shape_uri, display_rules) 

126 if not rule or "sortableBy" not in rule: 

127 return [] 

128 

129 sort_props = [] 

130 for sort_config in rule["sortableBy"]: 

131 prop = sort_config.copy() 

132 

133 for display_prop in rule["displayProperties"]: 

134 if display_prop["property"] == prop["property"]: 

135 if "displayRules" in display_prop: 

136 prop["displayName"] = display_prop["displayRules"][0][ 

137 "displayName" 

138 ] 

139 else: 

140 prop["displayName"] = display_prop.get( 

141 "displayName", prop["property"] 

142 ) 

143 break 

144 

145 # Default to string sorting 

146 prop["sortType"] = "string" 

147 

148 # Try to determine the sort type from form fields 

149 if form_fields: 

150 # First try with the exact entity_key (class, shape) 

151 if entity_key in form_fields and prop["property"] in form_fields[entity_key]: 

152 field_info = form_fields[entity_key][prop["property"]][0] # Take the first field definition 

153 prop["sortType"] = determine_sort_type(field_info) 

154 

155 sort_props.append(prop) 

156 

157 return sort_props 

158 

159 

160def determine_sort_type(field_info): 

161 """Helper function to determine sort type from field info.""" 

162 # If there's a shape, it's a reference to an entity (sort by label) 

163 if field_info.get("nodeShape"): 

164 return "string" 

165 # Otherwise look at the datatypes 

166 elif field_info.get("datatypes"): 

167 datatype = str(field_info["datatypes"][0]).lower() 

168 if any(t in datatype for t in ["date", "time"]): 

169 return "date" 

170 elif any( 

171 t in datatype 

172 for t in ["int", "float", "decimal", "double", "number"] 

173 ): 

174 return "number" 

175 elif "boolean" in datatype: 

176 return "boolean" 

177 # Default to string 

178 return "string" 

179 

180 

181def get_highest_priority_class(subject_classes): 

182 """ 

183 Find the highest priority class from the given list of classes. 

184  

185 Args: 

186 subject_classes: List of class URIs 

187  

188 Returns: 

189 The highest priority class or None if no classes are provided 

190 """ 

191 from heritrace.utils.shacl_utils import determine_shape_for_classes 

192 

193 if not subject_classes: 

194 return None 

195 

196 highest_priority = float('inf') 

197 highest_priority_class = None 

198 

199 for class_uri in subject_classes: 

200 class_uri = str(class_uri) 

201 shape = determine_shape_for_classes([class_uri]) 

202 entity_key = (class_uri, shape) 

203 priority = get_class_priority(entity_key) 

204 if priority < highest_priority: 

205 highest_priority = priority 

206 highest_priority_class = class_uri 

207 

208 if highest_priority_class is None and subject_classes: 

209 highest_priority_class = str(subject_classes[0]) 

210 

211 return highest_priority_class 

212 

213 

214def get_grouped_triples( 

215 subject: URIRef, 

216 triples: List[Tuple[URIRef, URIRef, URIRef|Literal]], 

217 valid_predicates_info: List[str], 

218 historical_snapshot: Optional[Graph] = None, 

219 highest_priority_class: Optional[str] = None, 

220 highest_priority_shape: Optional[str] = None 

221) -> Tuple[OrderedDict, set, dict]: 

222 """ 

223 This function groups the triples based on the display rules.  

224 It also fetches the values for the properties that are configured to be fetched from the query. 

225  

226 Args: 

227 subject: The subject URI 

228 triples: List of triples for the subject 

229 valid_predicates_info: List of valid predicates for the subject 

230 historical_snapshot: Optional historical snapshot graph 

231 highest_priority_class: The highest priority class URI for the subject 

232 highest_priority_shape: The highest priority shape URI for the subject 

233  

234 Returns: 

235 Tuple of grouped triples, relevant properties, and fetched values map 

236  

237 Note: 

238 relevant_properties contains all properties that should be considered  

239 "relevant" for UI operations (adding/deleting). This includes: 

240 - Properties configured in display rules when rules exist and match 

241 - ALL valid properties when no display rules exist or no rules match 

242 This ensures users can always interact with entities even without display rules. 

243 """ 

244 display_rules = get_display_rules() 

245 form_fields = get_form_fields() 

246 

247 grouped_triples = OrderedDict() 

248 relevant_properties = set() 

249 fetched_values_map = dict() 

250 

251 matching_rule = find_matching_rule(highest_priority_class, highest_priority_shape, display_rules) 

252 matching_form_field = form_fields.get((highest_priority_class, highest_priority_shape)) 

253 

254 ordered_properties = [] 

255 if display_rules and matching_rule: 

256 for prop_config in matching_rule.get("displayProperties", []): 

257 if prop_config.get("isVirtual"): 

258 prop_uri = prop_config.get("displayName") 

259 else: 

260 prop_uri = prop_config.get("property") 

261 if prop_uri and prop_uri not in ordered_properties: 

262 ordered_properties.append(prop_uri) 

263 

264 for prop_uri in valid_predicates_info: 

265 if prop_uri not in ordered_properties: 

266 ordered_properties.append(prop_uri) 

267 

268 for prop_uri in ordered_properties: 

269 current_prop_config = None 

270 

271 if display_rules and matching_rule: 

272 for prop_config in matching_rule.get("displayProperties", []): 

273 config_identifier = prop_config.get("displayName") if prop_config.get("isVirtual") else prop_config.get("property") 

274 if config_identifier == prop_uri: 

275 current_prop_config = prop_config 

276 break 

277 

278 current_form_field = matching_form_field.get(prop_uri) if matching_form_field else None 

279 

280 if current_prop_config: 

281 if "displayRules" in current_prop_config: 

282 is_ordered = "orderedBy" in current_prop_config 

283 order_property = current_prop_config.get("orderedBy") 

284 

285 for display_rule_nested in current_prop_config["displayRules"]: 

286 display_name_nested = display_rule_nested.get( 

287 "displayName", prop_uri 

288 ) 

289 relevant_properties.add(prop_uri) 

290 object_shape = display_rule_nested.get("shape") 

291 if current_prop_config.get("isVirtual"): 

292 process_virtual_property_display( 

293 display_name_nested, 

294 current_prop_config, 

295 subject, 

296 grouped_triples, 

297 fetched_values_map, 

298 historical_snapshot, 

299 highest_priority_shape, 

300 highest_priority_class 

301 ) 

302 else: 

303 process_display_rule( 

304 display_name_nested, 

305 prop_uri, 

306 display_rule_nested, 

307 subject, 

308 triples, 

309 grouped_triples, 

310 fetched_values_map, 

311 historical_snapshot, 

312 highest_priority_shape, 

313 object_shape, 

314 highest_priority_class 

315 ) 

316 if is_ordered and not current_prop_config.get("isVirtual", False): 

317 grouped_triples[display_name_nested]["is_draggable"] = True 

318 grouped_triples[display_name_nested]["ordered_by"] = order_property 

319 process_ordering( 

320 subject, 

321 current_prop_config, 

322 order_property, 

323 grouped_triples, 

324 display_name_nested, 

325 fetched_values_map, 

326 historical_snapshot, 

327 ) 

328 

329 # Ensure the grouped_triples entry exists 

330 if display_name_nested not in grouped_triples: 

331 grouped_triples[display_name_nested] = { 

332 "property": prop_uri, 

333 "triples": [], 

334 "subjectClass": highest_priority_class, 

335 "subjectShape": highest_priority_shape, 

336 "objectShape": display_rule_nested.get("shape") 

337 } 

338 

339 if "intermediateRelation" in display_rule_nested or "intermediateRelation" in current_prop_config: 

340 # Set intermediateRelation from the appropriate source 

341 if "intermediateRelation" in display_rule_nested: 

342 grouped_triples[display_name_nested]["intermediateRelation"] = display_rule_nested["intermediateRelation"] 

343 else: # Must be in current_prop_config based on the if condition 

344 grouped_triples[display_name_nested]["intermediateRelation"] = current_prop_config["intermediateRelation"] 

345 

346 else: 

347 display_name_simple = current_prop_config.get("displayName", prop_uri) 

348 # Only add non-virtual properties to relevant_properties 

349 # Virtual properties are handled separately in entity.py 

350 if not current_prop_config.get("isVirtual"): 

351 relevant_properties.add(prop_uri) 

352 

353 object_shape = None 

354 if current_form_field: 

355 for form_field in current_form_field: 

356 object_shape = form_field.get("nodeShape") 

357 break 

358 

359 if current_prop_config.get("isVirtual"): 

360 process_virtual_property_display( 

361 display_name_simple, 

362 current_prop_config, 

363 subject, 

364 grouped_triples, 

365 fetched_values_map, 

366 historical_snapshot, 

367 highest_priority_shape, 

368 highest_priority_class 

369 ) 

370 else: 

371 process_display_rule( 

372 display_name_simple, 

373 prop_uri, 

374 current_prop_config, 

375 subject, 

376 triples, 

377 grouped_triples, 

378 fetched_values_map, 

379 historical_snapshot, 

380 highest_priority_shape, 

381 object_shape, 

382 highest_priority_class 

383 ) 

384 if "orderedBy" in current_prop_config and not current_prop_config.get("isVirtual", False): 

385 if display_name_simple not in grouped_triples: 

386 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")} 

387 grouped_triples[display_name_simple]["is_draggable"] = True 

388 grouped_triples[display_name_simple]["ordered_by"] = current_prop_config.get("orderedBy") 

389 process_ordering( 

390 subject, 

391 current_prop_config, 

392 current_prop_config.get("orderedBy"), 

393 grouped_triples, 

394 display_name_simple, 

395 fetched_values_map, 

396 historical_snapshot, 

397 highest_priority_shape 

398 ) 

399 if "intermediateRelation" in current_prop_config: 

400 if display_name_simple not in grouped_triples: 

401 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")} 

402 grouped_triples[display_name_simple]["intermediateRelation"] = current_prop_config["intermediateRelation"] 

403 else: 

404 # Property without specific configuration - add to relevant_properties 

405 # Don't process properties without configuration (they are not virtual in this case) 

406 relevant_properties.add(prop_uri) 

407 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class) 

408 else: 

409 # No display rules or no matching rule - add all properties to relevant_properties 

410 relevant_properties.add(prop_uri) 

411 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class) 

412 

413 grouped_triples = OrderedDict(grouped_triples) 

414 return grouped_triples, relevant_properties 

415 

416 

417def process_display_rule( 

418 display_name, 

419 prop_uri, 

420 rule, 

421 subject, 

422 triples, 

423 grouped_triples, 

424 fetched_values_map, 

425 historical_snapshot=None, 

426 subject_shape=None, 

427 object_shape=None, 

428 subject_class=None, 

429): 

430 if display_name not in grouped_triples: 

431 grouped_triples[display_name] = { 

432 "property": prop_uri, 

433 "triples": [], 

434 "subjectClass": subject_class, 

435 "subjectShape": subject_shape, 

436 "objectShape": object_shape, 

437 "intermediateRelation": rule.get("intermediateRelation"), 

438 } 

439 for triple in triples: 

440 if str(triple[1]) == prop_uri: 

441 if rule.get("fetchValueFromQuery"): 

442 if historical_snapshot: 

443 result, external_entity = execute_historical_query( 

444 rule["fetchValueFromQuery"], 

445 subject, 

446 triple[2], 

447 historical_snapshot, 

448 ) 

449 else: 

450 result, external_entity = execute_sparql_query( 

451 rule["fetchValueFromQuery"], subject, triple[2] 

452 ) 

453 if result: 

454 fetched_values_map[str(result)] = str(triple[2]) 

455 new_triple = (str(triple[0]), str(triple[1]), str(result)) 

456 object_uri = str(triple[2]) 

457 new_triple_data = { 

458 "triple": new_triple, 

459 "external_entity": external_entity, 

460 "object": object_uri, 

461 "subjectClass": subject_class, 

462 "subjectShape": subject_shape, 

463 "objectShape": object_shape, 

464 } 

465 grouped_triples[display_name]["triples"].append(new_triple_data) 

466 else: 

467 if str(triple[1]) == 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type': 

468 from heritrace.utils.shacl_utils import determine_shape_for_classes 

469 object_class_shape = determine_shape_for_classes([triple[2]]) 

470 result = get_custom_filter().human_readable_class((triple[2], object_class_shape)) 

471 else: 

472 result = triple[2] 

473 

474 object_uri = str(triple[2]) 

475 

476 new_triple_data = { 

477 "triple": (str(triple[0]), str(triple[1]), result), 

478 "object": object_uri, 

479 "subjectClass": subject_class, 

480 "subjectShape": subject_shape, 

481 "objectShape": object_shape, 

482 } 

483 grouped_triples[display_name]["triples"].append(new_triple_data) 

484 

485 

486def process_virtual_property_display( 

487 display_name: str, 

488 prop_config: dict, 

489 subject: URIRef, 

490 grouped_triples: OrderedDict, 

491 fetched_values_map: dict, 

492 historical_snapshot: Optional[Graph] = None, 

493 subject_shape: Optional[str] = None, 

494 subject_class: Optional[str] = None 

495): 

496 """Process virtual properties by querying for entities that reference the current entity.""" 

497 

498 implementation = prop_config.get("implementedVia", {}) 

499 field_overrides = implementation.get("fieldOverrides", {}) 

500 target = implementation.get("target", {}) 

501 target_class = target.get("class") 

502 

503 # Find which field should reference the current entity 

504 reference_field = None 

505 for field_uri, override in field_overrides.items(): 

506 if override.get("value") == "${currentEntity}": 

507 reference_field = field_uri 

508 break 

509 

510 if not reference_field: 

511 return 

512 

513 decoded_subject = unquote(str(subject)) 

514 

515 # Query for entities that reference the current entity via the reference field 

516 query = f""" 

517 SELECT DISTINCT ?entity 

518 WHERE {{ 

519 ?entity <{reference_field}> <{decoded_subject}> . 

520 """ 

521 

522 if target_class: 

523 query += f""" 

524 ?entity a <{target_class}> . 

525 """ 

526 

527 query += """ 

528 } 

529 """ 

530 

531 if historical_snapshot: 

532 # Execute query on historical snapshot 

533 results = list(historical_snapshot.query(query)) 

534 entity_uris = [str(row[0]) for row in results] 

535 else: 

536 # Execute query on live triplestore 

537 sparql = get_sparql() 

538 sparql.setQuery(query) 

539 sparql.setReturnFormat(JSON) 

540 results = sparql.query().convert().get("results", {}).get("bindings", []) 

541 entity_uris = [res["entity"]["value"] for res in results] 

542 

543 # Now fetch display values for these entities if fetchValueFromQuery is configured 

544 

545 if prop_config.get("fetchValueFromQuery") and entity_uris: 

546 

547 if display_name not in grouped_triples: 

548 grouped_triples[display_name] = { 

549 "property": display_name, # Use display name as identifier for virtual properties 

550 "triples": [], 

551 "subjectClass": subject_class, 

552 "subjectShape": subject_shape, 

553 "objectShape": None, # Should be None for virtual properties to match key format 

554 "is_virtual": True 

555 } 

556 

557 for entity_uri in entity_uris: 

558 # Execute the fetch query for each entity 

559 if historical_snapshot: 

560 result, external_entity = execute_historical_query( 

561 prop_config["fetchValueFromQuery"], 

562 subject, 

563 URIRef(entity_uri), 

564 historical_snapshot 

565 ) 

566 else: 

567 result, external_entity = execute_sparql_query( 

568 prop_config["fetchValueFromQuery"], 

569 str(subject), 

570 entity_uri 

571 ) 

572 

573 if result: 

574 fetched_values_map[str(result)] = entity_uri 

575 new_triple_data = { 

576 "triple": (str(subject), display_name, str(result)), 

577 "external_entity": external_entity, 

578 "object": entity_uri, 

579 "subjectClass": subject_class, 

580 "subjectShape": subject_shape, 

581 "objectShape": target.get("shape"), 

582 "is_virtual": True 

583 } 

584 grouped_triples[display_name]["triples"].append(new_triple_data) 

585 else: 

586 # Even if no entities are found, we should still create the entry for virtual properties 

587 # so they can be added via the interface 

588 

589 if display_name not in grouped_triples: 

590 grouped_triples[display_name] = { 

591 "property": display_name, # Use display name as identifier for virtual properties 

592 "triples": [], 

593 "subjectClass": subject_class, 

594 "subjectShape": subject_shape, 

595 "objectShape": None, # Should be None for virtual properties to match key format 

596 "is_virtual": True 

597 } 

598 

599 

600def execute_sparql_query(query: str, subject: str, value: str) -> Tuple[str, str]: 

601 sparql = get_sparql() 

602 

603 decoded_subject = unquote(subject) 

604 decoded_value = unquote(value) 

605 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

606 query = query.replace("[[value]]", f"<{decoded_value}>") 

607 sparql.setQuery(query) 

608 sparql.setReturnFormat(JSON) 

609 results = sparql.query().convert().get("results", {}).get("bindings", []) 

610 if results: 

611 parsed_query = parseQuery(query) 

612 algebra_query = translateQuery(parsed_query).algebra 

613 variable_order = algebra_query["PV"] 

614 result = results[0] 

615 values = [ 

616 result.get(str(var_name), {}).get("value", None) 

617 for var_name in variable_order 

618 ] 

619 first_value = values[0] if len(values) > 0 else None 

620 second_value = values[1] if len(values) > 1 else None 

621 return (first_value, second_value) 

622 return None, None 

623 

624 

625def process_ordering( 

626 subject, 

627 prop, 

628 order_property, 

629 grouped_triples, 

630 display_name, 

631 fetched_values_map, 

632 historical_snapshot: ConjunctiveGraph | Graph | None = None, 

633): 

634 def get_ordered_sequence(order_results): 

635 order_map = {} 

636 for res in order_results: 

637 if isinstance(res, dict): # For live triplestore results 

638 ordered_entity = res["orderedEntity"]["value"] 

639 next_value = res["nextValue"]["value"] 

640 else: # For historical snapshot results 

641 ordered_entity = str(res[0]) 

642 next_value = str(res[1]) 

643 

644 order_map[str(ordered_entity)] = ( 

645 None if str(next_value) == "NONE" else str(next_value) 

646 ) 

647 

648 all_sequences = [] 

649 start_elements = set(order_map.keys()) - set(order_map.values()) 

650 while start_elements: 

651 sequence = [] 

652 current_element = start_elements.pop() 

653 while current_element in order_map: 

654 sequence.append(current_element) 

655 current_element = order_map[current_element] 

656 all_sequences.append(sequence) 

657 return all_sequences 

658 

659 decoded_subject = unquote(subject) 

660 

661 sparql = get_sparql() 

662 

663 order_query = f""" 

664 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue) 

665 WHERE {{ 

666 <{decoded_subject}> <{prop['property']}> ?orderedEntity. 

667 OPTIONAL {{ 

668 ?orderedEntity <{order_property}> ?next. 

669 }} 

670 }} 

671 """ 

672 if historical_snapshot: 

673 order_results = list(historical_snapshot.query(order_query)) 

674 else: 

675 sparql.setQuery(order_query) 

676 sparql.setReturnFormat(JSON) 

677 order_results = sparql.query().convert().get("results", {}).get("bindings", []) 

678 

679 order_sequences = get_ordered_sequence(order_results) 

680 for sequence in order_sequences: 

681 grouped_triples[display_name]["triples"].sort( 

682 key=lambda x: ( 

683 sequence.index( 

684 fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

685 ) 

686 if fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

687 in sequence 

688 else float("inf") 

689 ) 

690 ) 

691 

692 

693def process_default_property(prop_uri, triples, grouped_triples, subject_shape=None, subject_class=None): 

694 display_name = prop_uri 

695 grouped_triples[display_name] = { 

696 "property": prop_uri, 

697 "triples": [], 

698 "subjectClass": subject_class, 

699 "subjectShape": subject_shape, 

700 "objectShape": None 

701 } 

702 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri] 

703 for triple in triples_for_prop: 

704 new_triple_data = { 

705 "triple": (str(triple[0]), str(triple[1]), str(triple[2])), 

706 "object": str(triple[2]), 

707 "subjectClass": subject_class, 

708 "subjectShape": subject_shape, 

709 "objectShape": None, 

710 } 

711 grouped_triples[display_name]["triples"].append(new_triple_data) 

712 

713 

714def execute_historical_query( 

715 query: str, subject: str, value: str, historical_snapshot: Graph 

716) -> Tuple[str, str]: 

717 decoded_subject = unquote(subject) 

718 decoded_value = unquote(value) 

719 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

720 query = query.replace("[[value]]", f"<{decoded_value}>") 

721 results = historical_snapshot.query(query) 

722 if results: 

723 for result in results: 

724 if len(result) == 2: 

725 return (str(result[0]), str(result[1])) 

726 return None, None 

727 

728 

729def get_property_order_from_rules(highest_priority_class: str, shape_uri: str = None): 

730 """ 

731 Extract ordered list of properties from display rules for given entity class and optionally a shape. 

732 

733 Args: 

734 highest_priority_class: The highest priority class for the entity 

735 shape_uri: Optional shape URI for the entity 

736 

737 Returns: 

738 List of property URIs in the order specified by display rules 

739 """ 

740 if not highest_priority_class: 

741 return [] 

742 

743 rule = find_matching_rule(highest_priority_class, shape_uri) 

744 if not rule: 

745 return [] 

746 

747 ordered_properties = [] 

748 for prop in rule.get("displayProperties", []): 

749 if isinstance(prop, dict) and "property" in prop: 

750 ordered_properties.append(prop["property"]) 

751 

752 return ordered_properties 

753 

754 

755def get_predicate_ordering_info(predicate_uri: str, highest_priority_class: str, entity_shape: str = None) -> Optional[str]: 

756 """ 

757 Check if a predicate is ordered and return its ordering property. 

758  

759 Args: 

760 predicate_uri: URI of the predicate to check 

761 highest_priority_class: The highest priority class for the subject entity 

762 entity_shape: Optional shape for the subject entity 

763  

764 Returns: 

765 The ordering property URI if the predicate is ordered, None otherwise 

766 """ 

767 display_rules = get_display_rules() 

768 if not display_rules: 

769 return None 

770 

771 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules) 

772 if not rule: 

773 return None 

774 

775 for prop in rule.get("displayProperties", []): 

776 if isinstance(prop, dict) and prop.get("property") == predicate_uri: 

777 return prop.get("orderedBy") 

778 

779 return None 

780 

781 

782def get_shape_order_from_display_rules(highest_priority_class: str, entity_shape: str, predicate_uri: str) -> list: 

783 """ 

784 Get the ordered list of shapes for a specific predicate from display rules. 

785  

786 Args: 

787 highest_priority_class: The highest priority class for the entity 

788 entity_shape: The shape for the subject entity 

789 predicate_uri: The predicate URI to get shape ordering for 

790  

791 Returns: 

792 List of shape URIs in the order specified in displayRules, or empty list if no rules found 

793 """ 

794 display_rules = get_display_rules() 

795 if not display_rules: 

796 return [] 

797 

798 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules) 

799 if not rule or "displayProperties" not in rule: 

800 return [] 

801 

802 for prop_config in rule["displayProperties"]: 

803 if prop_config["property"] == predicate_uri: 

804 if "displayRules" in prop_config: 

805 return [display_rule.get("shape") for display_rule in prop_config["displayRules"] 

806 if display_rule.get("shape")] 

807 

808 return [] 

809 

810 

811def get_similarity_properties(entity_key: Tuple[str, str]) -> Optional[List[Union[str, Dict[str, List[str]]]]]: 

812 """Gets the similarity properties configuration for a given entity key. 

813 

814 This configuration specifies which properties should be used for similarity matching 

815 using a list-based structure supporting OR logic between elements and 

816 nested AND logic within elements. 

817 

818 Example structures: 

819 - ['prop1', 'prop2'] # prop1 OR prop2 

820 - [{'and': ['prop3', 'prop4']}] # prop3 AND prop4 

821 - ['prop1', {'and': ['prop2', 'prop3']}] # prop1 OR (prop2 AND prop3) 

822 

823 Args: 

824 entity_key: A tuple (class_uri, shape_uri) 

825 

826 Returns: 

827 A list where each element is either a property URI string or a dictionary 

828 {'and': [list_of_property_uris]}, representing the boolean logic. 

829 Returns None if no configuration is found or if the structure is invalid. 

830 """ 

831 class_uri = entity_key[0] 

832 shape_uri = entity_key[1] 

833 

834 # Find the matching rule 

835 rule = find_matching_rule(class_uri, shape_uri) 

836 if not rule: 

837 return None 

838 

839 similarity_props = rule.get("similarity_properties") 

840 

841 if not similarity_props or not isinstance(similarity_props, list): 

842 return None 

843 

844 # Validate each element in the list. 

845 validated_props = [] 

846 for item in similarity_props: 

847 if isinstance(item, str): 

848 validated_props.append(item) 

849 elif isinstance(item, dict) and len(item) == 1 and "and" in item: 

850 and_list = item["and"] 

851 if isinstance(and_list, list) and and_list and all(isinstance(p, str) for p in and_list): 

852 validated_props.append(item) 

853 else: 

854 print( 

855 f"Warning: Invalid 'and' group in similarity_properties" + 

856 (f" for class {class_uri}" if class_uri else "") + 

857 (f" with shape {shape_uri}" if shape_uri else "") + 

858 f". Expected {{'and': ['prop_uri', ...]}} with a non-empty list of strings." 

859 ) 

860 return None # Invalid 'and' group structure 

861 else: 

862 print( 

863 f"Warning: Invalid item format in similarity_properties list" + 

864 (f" for class {class_uri}" if class_uri else "") + 

865 (f" with shape {shape_uri}" if shape_uri else "") + 

866 f". Expected a property URI string or {{'and': [...]}} dict." 

867 ) 

868 return None # Invalid item type 

869 

870 return validated_props if validated_props else None # Return validated list or None if empty after validation