Coverage for heritrace / utils / display_rules_utils.py: 94%

398 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import logging 

8from collections import OrderedDict 

9from dataclasses import dataclass 

10from typing import TYPE_CHECKING 

11from urllib.parse import unquote 

12 

13from rdflib import Graph, Literal, URIRef 

14from rdflib.plugins.sparql.algebra import translateQuery 

15from rdflib.plugins.sparql.parser import parseQuery 

16from SPARQLWrapper import JSON 

17 

18from heritrace.extensions import ( 

19 get_custom_filter, 

20 get_display_rules, 

21 get_form_fields, 

22 get_sparql, 

23 get_sparql_bindings, 

24 select_results, 

25) 

26 

27if TYPE_CHECKING: 

28 from rdflib.query import ResultRow 

29 

30 

31@dataclass(slots=True) 

32class GroupingContext: 

33 subject: URIRef 

34 triples: list[tuple[URIRef, URIRef, URIRef | Literal]] 

35 grouped_triples: OrderedDict 

36 fetched_values_map: dict[str, str] 

37 relevant_properties: set 

38 historical_snapshot: Graph | None 

39 highest_priority_class: str | None 

40 highest_priority_shape: str | None 

41 

42 

43_SUBJECT_LABEL_PAIR_LENGTH = 2 

44 

45 

46def find_matching_rule( 

47 class_uri: str | None = None, 

48 shape_uri: str | None = None, 

49 rules: list[dict] | None = None, 

50) -> dict | None: 

51 """ 

52 Find the most appropriate rule for a given class and/or shape. 

53 At least one of class_uri or shape_uri must be provided. 

54 

55 Args: 

56 class_uri: Optional URI of the class 

57 shape_uri: Optional URI of the shape 

58 rules: Optional list of rules to search in, defaults to global display_rules 

59 

60 Returns: 

61 The matching rule or None if no match is found 

62 """ 

63 if not rules: 

64 rules = get_display_rules() 

65 if not rules: 

66 return None 

67 

68 # Initialize variables to track potential matches 

69 class_match = None 

70 shape_match = None 

71 highest_priority = float("inf") 

72 

73 # Scan all rules to find the best match based on priority 

74 for rule in rules: 

75 rule_priority = rule.get("priority", 0) 

76 

77 # Case 1: Both class and shape match (exact match) 

78 if ( 

79 class_uri 

80 and shape_uri 

81 and "class" in rule["target"] 

82 and rule["target"]["class"] == str(class_uri) 

83 and "shape" in rule["target"] 

84 and rule["target"]["shape"] == str(shape_uri) 

85 ): 

86 # Exact match always takes highest precedence 

87 return rule 

88 

89 # Case 2: Only class matches 

90 if ( 

91 class_uri 

92 and "class" in rule["target"] 

93 and rule["target"]["class"] == str(class_uri) 

94 and "shape" not in rule["target"] 

95 ): 

96 if class_match is None or rule_priority < highest_priority: 

97 class_match = rule 

98 highest_priority = rule_priority 

99 

100 # Case 3: Only shape matches 

101 elif ( 

102 shape_uri 

103 and "shape" in rule["target"] 

104 and rule["target"]["shape"] == str(shape_uri) 

105 and "class" not in rule["target"] 

106 ) and (shape_match is None or rule_priority < highest_priority): 

107 shape_match = rule 

108 highest_priority = rule_priority 

109 

110 # Return the best match based on priority 

111 # Shape rules typically have higher specificity, 

112 # so prefer them if they have equal priority 

113 if shape_match and ( 

114 class_match is None 

115 or shape_match.get("priority", 0) <= class_match.get("priority", 0) 

116 ): 

117 return shape_match 

118 if class_match: 

119 return class_match 

120 

121 return None 

122 

123 

124def get_class_priority(entity_key: tuple[str, str | None]) -> float: 

125 """ 

126 Returns the priority of a specific entity key (class_uri, shape_uri). 

127 Calculates the priority directly from the display rules. 

128 Classes without defined rules receive the lowest priority (highest number). 

129 

130 Args: 

131 entity_key: A tuple (class_uri, shape_uri) 

132 """ 

133 class_uri = entity_key[0] 

134 shape_uri = entity_key[1] 

135 

136 rule = find_matching_rule(class_uri, shape_uri) 

137 return rule.get("priority", 0) if rule else float("inf") 

138 

139 

140def is_entity_type_visible(entity_key: tuple[str, str | None]) -> bool: 

141 """ 

142 Determines if an entity type should be displayed. 

143 

144 Args: 

145 entity_key: A tuple (class_uri, shape_uri) 

146 """ 

147 class_uri = entity_key[0] 

148 shape_uri = entity_key[1] 

149 

150 rule = find_matching_rule(class_uri, shape_uri) 

151 return rule.get("shouldBeDisplayed", True) if rule else True 

152 

153 

154def get_sortable_properties(entity_key: tuple[str, str | None]) -> list[dict[str, str]]: 

155 """ 

156 Gets the sortable properties from display rules for an entity type and/or shape. 

157 Infers the sorting type from form_fields_cache. 

158 

159 Args: 

160 entity_key: A tuple (class_uri, shape_uri) 

161 

162 Returns: 

163 List of dictionaries with sorting information 

164 """ 

165 display_rules = get_display_rules() 

166 if not display_rules: 

167 return [] 

168 

169 form_fields = get_form_fields() 

170 

171 class_uri = entity_key[0] 

172 shape_uri = entity_key[1] 

173 

174 rule = find_matching_rule(class_uri, shape_uri, display_rules) 

175 if not rule or "sortableBy" not in rule: 

176 return [] 

177 

178 sort_props = [] 

179 for sort_config in rule["sortableBy"]: 

180 prop = sort_config.copy() 

181 

182 for display_prop in rule["displayProperties"]: 

183 if display_prop["property"] == prop["property"]: 

184 if "displayRules" in display_prop: 

185 prop["displayName"] = display_prop["displayRules"][0]["displayName"] 

186 else: 

187 prop["displayName"] = display_prop.get( 

188 "displayName", prop["property"] 

189 ) 

190 break 

191 

192 # Default to string sorting 

193 prop["sortType"] = "string" 

194 

195 # Try to determine the sort type from form fields 

196 if form_fields and ( 

197 entity_key in form_fields and prop["property"] in form_fields[entity_key] 

198 ): 

199 field_info = form_fields[entity_key][prop["property"]][ 

200 0 

201 ] # Take the first field definition 

202 prop["sortType"] = determine_sort_type(field_info) 

203 

204 sort_props.append(prop) 

205 

206 return sort_props 

207 

208 

209def determine_sort_type(field_info: dict) -> str: 

210 """Helper function to determine sort type from field info.""" 

211 # If there's a shape, it's a reference to an entity (sort by label) 

212 if field_info.get("nodeShape"): 

213 return "string" 

214 # Otherwise look at the datatypes 

215 if field_info.get("datatypes"): 

216 datatype = str(field_info["datatypes"][0]).lower() 

217 if any(t in datatype for t in ["date", "time"]): 

218 return "date" 

219 if any(t in datatype for t in ["int", "float", "decimal", "double", "number"]): 

220 return "number" 

221 if "boolean" in datatype: 

222 return "boolean" 

223 # Default to string 

224 return "string" 

225 

226 

227def get_highest_priority_class(subject_classes: list[str]) -> str | None: 

228 """ 

229 Find the highest priority class from the given list of classes. 

230 

231 Args: 

232 subject_classes: List of class URIs 

233 

234 Returns: 

235 The highest priority class or None if no classes are provided 

236 """ 

237 from heritrace.utils.shacl_utils import determine_shape_for_classes # noqa: PLC0415 

238 

239 if not subject_classes: 

240 return None 

241 

242 highest_priority = float("inf") 

243 highest_priority_class = None 

244 

245 for raw_class_uri in subject_classes: 

246 class_uri = str(raw_class_uri) 

247 shape = determine_shape_for_classes([class_uri]) 

248 entity_key = (class_uri, shape) 

249 priority = get_class_priority(entity_key) 

250 if priority < highest_priority: 

251 highest_priority = priority 

252 highest_priority_class = class_uri 

253 

254 if highest_priority_class is None and subject_classes: 

255 highest_priority_class = str(subject_classes[0]) 

256 

257 return highest_priority_class 

258 

259 

260def _ensure_grouped_entry( 

261 ctx: GroupingContext, 

262 display_name: str, 

263 prop_uri: str, 

264 object_shape: str | None, 

265) -> None: 

266 if display_name not in ctx.grouped_triples: 

267 ctx.grouped_triples[display_name] = { 

268 "property": prop_uri, 

269 "triples": [], 

270 "subjectClass": ctx.highest_priority_class, 

271 "subjectShape": ctx.highest_priority_shape, 

272 "objectShape": object_shape, 

273 } 

274 

275 

276def _apply_ordering_to_group( 

277 ctx: GroupingContext, 

278 current_prop_config: dict, 

279 order_property: str | None, 

280 display_name: str, 

281) -> None: 

282 ctx.grouped_triples[display_name]["is_draggable"] = True 

283 ctx.grouped_triples[display_name]["ordered_by"] = order_property 

284 process_ordering( 

285 ctx, 

286 current_prop_config, 

287 order_property, 

288 display_name, 

289 ) 

290 

291 

292def _apply_intermediate_relation( 

293 grouped_triples: OrderedDict, 

294 display_name: str, 

295 *sources: dict, 

296) -> None: 

297 for source in sources: 

298 if "intermediateRelation" in source: 

299 grouped_triples[display_name]["intermediateRelation"] = source[ 

300 "intermediateRelation" 

301 ] 

302 return 

303 

304 

305def _process_property_with_nested_display_rules( 

306 prop_uri: str, 

307 current_prop_config: dict, 

308 ctx: GroupingContext, 

309) -> None: 

310 is_ordered = "orderedBy" in current_prop_config 

311 order_property = current_prop_config.get("orderedBy") 

312 

313 for display_rule_nested in current_prop_config["displayRules"]: 

314 display_name_nested = display_rule_nested.get("displayName", prop_uri) 

315 ctx.relevant_properties.add(prop_uri) 

316 object_shape = display_rule_nested.get("shape") 

317 if current_prop_config.get("isVirtual"): 

318 process_virtual_property_display( 

319 display_name_nested, 

320 current_prop_config, 

321 ctx, 

322 ) 

323 else: 

324 process_display_rule( 

325 display_name_nested, 

326 prop_uri, 

327 display_rule_nested, 

328 ctx, 

329 object_shape=object_shape, 

330 ) 

331 if is_ordered and not current_prop_config.get("isVirtual", False): 

332 _apply_ordering_to_group( 

333 ctx, 

334 current_prop_config, 

335 order_property, 

336 display_name_nested, 

337 ) 

338 

339 _ensure_grouped_entry( 

340 ctx, 

341 display_name_nested, 

342 prop_uri, 

343 display_rule_nested.get("shape"), 

344 ) 

345 

346 _apply_intermediate_relation( 

347 ctx.grouped_triples, 

348 display_name_nested, 

349 display_rule_nested, 

350 current_prop_config, 

351 ) 

352 

353 

354def _process_property_with_simple_config( 

355 prop_uri: str, 

356 current_prop_config: dict, 

357 current_form_field: list[dict] | None, 

358 ctx: GroupingContext, 

359) -> None: 

360 display_name_simple = current_prop_config.get("displayName", prop_uri) 

361 # Only add non-virtual properties to relevant_properties 

362 # Virtual properties are handled separately in entity.py 

363 if not current_prop_config.get("isVirtual"): 

364 ctx.relevant_properties.add(prop_uri) 

365 

366 object_shape = None 

367 if current_form_field: 

368 for form_field in current_form_field: 

369 object_shape = form_field.get("nodeShape") 

370 break 

371 

372 if current_prop_config.get("isVirtual"): 

373 process_virtual_property_display( 

374 display_name_simple, 

375 current_prop_config, 

376 ctx, 

377 ) 

378 else: 

379 process_display_rule( 

380 display_name_simple, 

381 prop_uri, 

382 current_prop_config, 

383 ctx, 

384 object_shape=object_shape, 

385 ) 

386 if "orderedBy" in current_prop_config and not current_prop_config.get( 

387 "isVirtual", False 

388 ): 

389 _ensure_grouped_entry( 

390 ctx, 

391 display_name_simple, 

392 prop_uri, 

393 current_prop_config.get("shape"), 

394 ) 

395 _apply_ordering_to_group( 

396 ctx, 

397 current_prop_config, 

398 current_prop_config.get("orderedBy"), 

399 display_name_simple, 

400 ) 

401 if "intermediateRelation" in current_prop_config: 

402 _ensure_grouped_entry( 

403 ctx, 

404 display_name_simple, 

405 prop_uri, 

406 current_prop_config.get("shape"), 

407 ) 

408 ctx.grouped_triples[display_name_simple]["intermediateRelation"] = ( 

409 current_prop_config["intermediateRelation"] 

410 ) 

411 

412 

413def _process_property_with_display_rules( 

414 prop_uri: str, 

415 matching_rule: dict, 

416 matching_form_field: dict | None, 

417 ctx: GroupingContext, 

418) -> None: 

419 current_prop_config = None 

420 for prop_config in matching_rule.get("displayProperties", []): 

421 config_identifier = ( 

422 prop_config.get("displayName") 

423 if prop_config.get("isVirtual") 

424 else prop_config.get("property") 

425 ) 

426 if config_identifier == prop_uri: 

427 current_prop_config = prop_config 

428 break 

429 

430 current_form_field = ( 

431 matching_form_field.get(prop_uri) if matching_form_field else None 

432 ) 

433 

434 if current_prop_config: 

435 if "displayRules" in current_prop_config: 

436 _process_property_with_nested_display_rules( 

437 prop_uri, 

438 current_prop_config, 

439 ctx, 

440 ) 

441 else: 

442 _process_property_with_simple_config( 

443 prop_uri, 

444 current_prop_config, 

445 current_form_field, 

446 ctx, 

447 ) 

448 else: 

449 # Property without specific configuration - add to relevant_properties 

450 # Don't process properties without config 

451 # (they are not virtual in this case) 

452 ctx.relevant_properties.add(prop_uri) 

453 process_default_property( 

454 prop_uri, 

455 ctx.triples, 

456 ctx.grouped_triples, 

457 ctx.highest_priority_shape, 

458 ctx.highest_priority_class, 

459 ) 

460 

461 

462def get_grouped_triples( 

463 subject: URIRef, 

464 triples: list[tuple[URIRef, URIRef, URIRef | Literal]], 

465 valid_predicates_info: list[str], 

466 historical_snapshot: Graph | None = None, 

467 entity_key: tuple[str | None, str | None] = (None, None), 

468) -> tuple[OrderedDict, set]: 

469 highest_priority_class, highest_priority_shape = entity_key 

470 display_rules = get_display_rules() 

471 form_fields = get_form_fields() 

472 

473 grouped_triples = OrderedDict() 

474 relevant_properties: set = set() 

475 fetched_values_map: dict[str, str] = {} 

476 

477 ctx = GroupingContext( 

478 subject=subject, 

479 triples=triples, 

480 grouped_triples=grouped_triples, 

481 fetched_values_map=fetched_values_map, 

482 relevant_properties=relevant_properties, 

483 historical_snapshot=historical_snapshot, 

484 highest_priority_class=highest_priority_class, 

485 highest_priority_shape=highest_priority_shape, 

486 ) 

487 

488 matching_rule = find_matching_rule( 

489 highest_priority_class, highest_priority_shape, display_rules 

490 ) 

491 matching_form_field = form_fields.get( 

492 (highest_priority_class, highest_priority_shape) 

493 ) 

494 

495 ordered_properties = [] 

496 if display_rules and matching_rule: 

497 for prop_config in matching_rule.get("displayProperties", []): 

498 if prop_config.get("isVirtual"): 

499 prop_uri = prop_config.get("displayName") 

500 else: 

501 prop_uri = prop_config.get("property") 

502 if prop_uri and prop_uri not in ordered_properties: 

503 ordered_properties.append(prop_uri) 

504 

505 for prop_uri in valid_predicates_info: 

506 if prop_uri not in ordered_properties: 

507 ordered_properties.append(prop_uri) 

508 

509 for prop_uri in ordered_properties: 

510 if display_rules and matching_rule: 

511 _process_property_with_display_rules( 

512 prop_uri, 

513 matching_rule, 

514 matching_form_field, 

515 ctx, 

516 ) 

517 else: 

518 # No display rules or no matching rule - 

519 # add all properties to relevant_properties 

520 ctx.relevant_properties.add(prop_uri) 

521 process_default_property( 

522 prop_uri, 

523 ctx.triples, 

524 ctx.grouped_triples, 

525 ctx.highest_priority_shape, 

526 ctx.highest_priority_class, 

527 ) 

528 

529 ctx.grouped_triples = OrderedDict(ctx.grouped_triples) 

530 return ctx.grouped_triples, ctx.relevant_properties 

531 

532 

533def process_display_rule( 

534 display_name: str, 

535 prop_uri: str, 

536 rule: dict, 

537 ctx: GroupingContext, 

538 object_shape: str | None = None, 

539) -> None: 

540 if display_name not in ctx.grouped_triples: 

541 ctx.grouped_triples[display_name] = { 

542 "property": prop_uri, 

543 "triples": [], 

544 "subjectClass": ctx.highest_priority_class, 

545 "subjectShape": ctx.highest_priority_shape, 

546 "objectShape": object_shape, 

547 "intermediateRelation": rule.get("intermediateRelation"), 

548 } 

549 for triple in ctx.triples: 

550 if str(triple[1]) == prop_uri: 

551 if rule.get("fetchValueFromQuery"): 

552 if ctx.historical_snapshot: 

553 result, external_entity = execute_historical_query( 

554 rule["fetchValueFromQuery"], 

555 ctx.subject, 

556 triple[2], 

557 ctx.historical_snapshot, 

558 ) 

559 else: 

560 result, external_entity = execute_sparql_query( 

561 rule["fetchValueFromQuery"], ctx.subject, triple[2] 

562 ) 

563 if result: 

564 ctx.fetched_values_map[str(result)] = str(triple[2]) 

565 new_triple = (str(triple[0]), str(triple[1]), str(result)) 

566 object_uri = str(triple[2]) 

567 new_triple_data = { 

568 "triple": new_triple, 

569 "external_entity": external_entity, 

570 "object": object_uri, 

571 "subjectClass": ctx.highest_priority_class, 

572 "subjectShape": ctx.highest_priority_shape, 

573 "objectShape": object_shape, 

574 } 

575 ctx.grouped_triples[display_name]["triples"].append(new_triple_data) 

576 else: 

577 if str(triple[1]) == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": 

578 from heritrace.utils.shacl_utils import ( # noqa: PLC0415 

579 determine_shape_for_classes, 

580 ) 

581 

582 object_class_shape = determine_shape_for_classes([triple[2]]) 

583 result = get_custom_filter().human_readable_class( 

584 (triple[2], object_class_shape) 

585 ) 

586 else: 

587 result = triple[2] 

588 

589 object_uri = str(triple[2]) 

590 

591 new_triple_data = { 

592 "triple": (str(triple[0]), str(triple[1]), result), 

593 "object": object_uri, 

594 "subjectClass": ctx.highest_priority_class, 

595 "subjectShape": ctx.highest_priority_shape, 

596 "objectShape": object_shape, 

597 } 

598 ctx.grouped_triples[display_name]["triples"].append(new_triple_data) 

599 

600 

601def _fetch_virtual_property_entities( 

602 reference_field: str, 

603 target_class: str | None, 

604 ctx: GroupingContext, 

605) -> list[str]: 

606 decoded_subject = unquote(str(ctx.subject)) 

607 

608 query = f""" 

609 SELECT DISTINCT ?entity 

610 WHERE {{ 

611 ?entity <{reference_field}> <{decoded_subject}> . 

612 """ 

613 

614 if target_class: 

615 query += f""" 

616 ?entity a <{target_class}> . 

617 """ 

618 

619 query += """ 

620 } 

621 """ 

622 

623 if ctx.historical_snapshot: 

624 return [ 

625 str(row[0]) for row in select_results(ctx.historical_snapshot.query(query)) 

626 ] 

627 

628 sparql = get_sparql() 

629 sparql.setQuery(query) 

630 sparql.setReturnFormat(JSON) 

631 bindings = get_sparql_bindings(sparql.query().convert()) 

632 return [res["entity"]["value"] for res in bindings] 

633 

634 

635def _build_virtual_property_triples( 

636 display_name: str, 

637 prop_config: dict, 

638 target_shape: str | None, 

639 entity_uris: list[str], 

640 ctx: GroupingContext, 

641) -> None: 

642 if display_name not in ctx.grouped_triples: 

643 ctx.grouped_triples[display_name] = { 

644 "property": display_name, 

645 "triples": [], 

646 "subjectClass": ctx.highest_priority_class, 

647 "subjectShape": ctx.highest_priority_shape, 

648 "objectShape": None, 

649 "is_virtual": True, 

650 } 

651 

652 for entity_uri in entity_uris: 

653 if ctx.historical_snapshot: 

654 result, external_entity = execute_historical_query( 

655 prop_config["fetchValueFromQuery"], 

656 ctx.subject, 

657 URIRef(entity_uri), 

658 ctx.historical_snapshot, 

659 ) 

660 else: 

661 result, external_entity = execute_sparql_query( 

662 prop_config["fetchValueFromQuery"], str(ctx.subject), entity_uri 

663 ) 

664 

665 if result: 

666 ctx.fetched_values_map[str(result)] = entity_uri 

667 new_triple_data = { 

668 "triple": (str(ctx.subject), display_name, str(result)), 

669 "external_entity": external_entity, 

670 "object": entity_uri, 

671 "subjectClass": ctx.highest_priority_class, 

672 "subjectShape": ctx.highest_priority_shape, 

673 "objectShape": target_shape, 

674 "is_virtual": True, 

675 } 

676 ctx.grouped_triples[display_name]["triples"].append(new_triple_data) 

677 

678 

679def process_virtual_property_display( 

680 display_name: str, 

681 prop_config: dict, 

682 ctx: GroupingContext, 

683) -> None: 

684 implementation = prop_config.get("implementedVia", {}) 

685 field_overrides = implementation.get("fieldOverrides", {}) 

686 target = implementation.get("target", {}) 

687 target_class = target.get("class") 

688 

689 reference_field = None 

690 for field_uri, override in field_overrides.items(): 

691 if override.get("value") == "${currentEntity}": 

692 reference_field = field_uri 

693 break 

694 

695 if not reference_field: 

696 return 

697 

698 entity_uris = _fetch_virtual_property_entities(reference_field, target_class, ctx) 

699 

700 if prop_config.get("fetchValueFromQuery") and entity_uris: 

701 _build_virtual_property_triples( 

702 display_name, prop_config, target.get("shape"), entity_uris, ctx 

703 ) 

704 

705 elif display_name not in ctx.grouped_triples: 

706 ctx.grouped_triples[display_name] = { 

707 "property": display_name, 

708 "triples": [], 

709 "subjectClass": ctx.highest_priority_class, 

710 "subjectShape": ctx.highest_priority_shape, 

711 "objectShape": None, 

712 "is_virtual": True, 

713 } 

714 

715 

716def execute_sparql_query( 

717 query: str, subject: str, value: str 

718) -> tuple[str | None, str | None]: 

719 sparql = get_sparql() 

720 

721 decoded_subject = unquote(subject) 

722 decoded_value = unquote(value) 

723 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

724 query = query.replace("[[value]]", f"<{decoded_value}>") 

725 sparql.setQuery(query) 

726 sparql.setReturnFormat(JSON) 

727 bindings = get_sparql_bindings(sparql.query().convert()) 

728 if bindings: 

729 parsed_query = parseQuery(query) 

730 algebra_query = translateQuery(parsed_query).algebra 

731 variable_order = algebra_query["PV"] 

732 result = bindings[0] 

733 values = [ 

734 result.get(str(var_name), {}).get("value", None) 

735 for var_name in variable_order 

736 ] 

737 first_value = values[0] if len(values) > 0 else None 

738 second_value = values[1] if len(values) > 1 else None 

739 return (first_value, second_value) 

740 return None, None 

741 

742 

743def process_ordering( 

744 ctx: GroupingContext, 

745 prop: dict, 

746 order_property: str | None, 

747 display_name: str, 

748) -> None: 

749 def get_ordered_sequence( 

750 order_results: list[dict[str, dict[str, str]]] | list[ResultRow], 

751 ) -> list[list[str]]: 

752 order_map = {} 

753 for res in order_results: 

754 if isinstance(res, dict): # For live triplestore results 

755 ordered_entity = res["orderedEntity"]["value"] 

756 next_value = res["nextValue"]["value"] 

757 else: # For historical snapshot results 

758 ordered_entity = str(res[0]) 

759 next_value = str(res[1]) 

760 

761 order_map[str(ordered_entity)] = ( 

762 None if str(next_value) == "NONE" else str(next_value) 

763 ) 

764 

765 all_sequences = [] 

766 start_elements = set(order_map.keys()) - set(order_map.values()) 

767 while start_elements: 

768 sequence = [] 

769 current_element = start_elements.pop() 

770 while current_element in order_map: 

771 sequence.append(current_element) 

772 current_element = order_map[current_element] 

773 all_sequences.append(sequence) 

774 return all_sequences 

775 

776 decoded_subject = unquote(ctx.subject) 

777 

778 sparql = get_sparql() 

779 

780 order_query = f""" 

781 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue) 

782 WHERE {{ 

783 <{decoded_subject}> <{prop["property"]}> ?orderedEntity. 

784 OPTIONAL {{ 

785 ?orderedEntity <{order_property}> ?next. 

786 }} 

787 }} 

788 """ 

789 if ctx.historical_snapshot: 

790 order_results: list[dict[str, dict[str, str]]] | list[ResultRow] = list( 

791 select_results(ctx.historical_snapshot.query(order_query)) 

792 ) 

793 else: 

794 sparql.setQuery(order_query) 

795 sparql.setReturnFormat(JSON) 

796 order_results = get_sparql_bindings(sparql.query().convert()) 

797 

798 order_sequences = get_ordered_sequence(order_results) 

799 for sequence in order_sequences: 

800 ctx.grouped_triples[display_name]["triples"].sort( 

801 key=lambda x: ( 

802 sequence.index( 

803 ctx.fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

804 ) 

805 if ctx.fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

806 in sequence 

807 else float("inf") 

808 ) 

809 ) 

810 

811 

812def process_default_property( 

813 prop_uri: str, 

814 triples: list[tuple[URIRef, URIRef, URIRef | Literal]], 

815 grouped_triples: OrderedDict, 

816 subject_shape: str | None = None, 

817 subject_class: str | None = None, 

818) -> None: 

819 display_name = prop_uri 

820 grouped_triples[display_name] = { 

821 "property": prop_uri, 

822 "triples": [], 

823 "subjectClass": subject_class, 

824 "subjectShape": subject_shape, 

825 "objectShape": None, 

826 } 

827 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri] 

828 for triple in triples_for_prop: 

829 new_triple_data = { 

830 "triple": (str(triple[0]), str(triple[1]), str(triple[2])), 

831 "object": str(triple[2]), 

832 "subjectClass": subject_class, 

833 "subjectShape": subject_shape, 

834 "objectShape": None, 

835 } 

836 grouped_triples[display_name]["triples"].append(new_triple_data) 

837 

838 

839def execute_historical_query( 

840 query: str, subject: str, value: str, historical_snapshot: Graph 

841) -> tuple[str | None, str | None]: 

842 decoded_subject = unquote(subject) 

843 decoded_value = unquote(value) 

844 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

845 query = query.replace("[[value]]", f"<{decoded_value}>") 

846 results = historical_snapshot.query(query) 

847 for row in select_results(results): 

848 if len(row) == _SUBJECT_LABEL_PAIR_LENGTH: 

849 return (str(row[0]), str(row[1])) 

850 return None, None 

851 

852 

853def get_property_order_from_rules( 

854 highest_priority_class: str | None, shape_uri: str | None = None 

855) -> list[str]: 

856 """ 

857 Extract ordered list of properties from display rules 

858 for given entity class and optionally a shape. 

859 

860 Args: 

861 highest_priority_class: The highest priority class for the entity 

862 shape_uri: Optional shape URI for the entity 

863 

864 Returns: 

865 List of property URIs in the order specified by display rules 

866 """ 

867 if not highest_priority_class: 

868 return [] 

869 

870 rule = find_matching_rule(highest_priority_class, shape_uri) 

871 if not rule: 

872 return [] 

873 

874 ordered_properties = [] 

875 for prop in rule.get("displayProperties", []): 

876 if not isinstance(prop, dict): 

877 continue 

878 if prop.get("isVirtual"): 

879 continue # Virtual properties don't have RDF predicates 

880 if "property" in prop: 

881 ordered_properties.append(prop["property"]) 

882 

883 return ordered_properties 

884 

885 

886def get_predicate_ordering_info( 

887 predicate_uri: str, 

888 highest_priority_class: str | None, 

889 entity_shape: str | None = None, 

890) -> str | None: 

891 """ 

892 Check if a predicate is ordered and return its ordering property. 

893 

894 Args: 

895 predicate_uri: URI of the predicate to check 

896 highest_priority_class: The highest priority class for the subject entity 

897 entity_shape: Optional shape for the subject entity 

898 

899 Returns: 

900 The ordering property URI if the predicate is ordered, None otherwise 

901 """ 

902 display_rules = get_display_rules() 

903 if not display_rules: 

904 return None 

905 

906 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules) 

907 if not rule: 

908 return None 

909 

910 for prop in rule.get("displayProperties", []): 

911 if not isinstance(prop, dict): 

912 continue 

913 if prop.get("isVirtual"): 

914 continue # Virtual properties don't have RDF predicates or ordering 

915 if prop.get("property") == predicate_uri: 

916 return prop.get("orderedBy") 

917 

918 return None 

919 

920 

921def get_shape_order_from_display_rules( 

922 highest_priority_class: str | None, entity_shape: str | None, predicate_uri: str 

923) -> list[str]: 

924 """ 

925 Get the ordered list of shapes for a specific predicate from display rules. 

926 

927 Args: 

928 highest_priority_class: The highest priority class for the entity 

929 entity_shape: The shape for the subject entity 

930 predicate_uri: The predicate URI to get shape ordering for 

931 

932 Returns: 

933 List of shape URIs in the order specified in 

934 displayRules, or empty list if no rules found 

935 """ 

936 display_rules = get_display_rules() 

937 if not display_rules: 

938 return [] 

939 

940 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules) 

941 if not rule or "displayProperties" not in rule: 

942 return [] 

943 

944 for prop_config in rule["displayProperties"]: 

945 if not isinstance(prop_config, dict): 

946 continue 

947 if prop_config.get("isVirtual"): 

948 continue # Virtual properties don't have RDF predicates or display rules 

949 if "property" not in prop_config: 

950 continue # Defensive check for malformed configuration 

951 if prop_config["property"] == predicate_uri and "displayRules" in prop_config: 

952 return [ 

953 display_rule.get("shape") 

954 for display_rule in prop_config["displayRules"] 

955 if display_rule.get("shape") 

956 ] 

957 

958 return [] 

959 

960 

961def get_similarity_properties( 

962 entity_key: tuple[str, str | None], 

963) -> list[str | dict[str, list[str]]] | None: 

964 """Gets the similarity properties configuration for a given entity key. 

965 

966 This configuration specifies which properties should be used for similarity matching 

967 using a list-based structure supporting OR logic between elements and 

968 nested AND logic within elements. 

969 

970 Example structures: 

971 - ['prop1', 'prop2'] # prop1 OR prop2 

972 - [{'and': ['prop3', 'prop4']}] # prop3 AND prop4 

973 - ['prop1', {'and': ['prop2', 'prop3']}] # prop1 OR (prop2 AND prop3) 

974 

975 Args: 

976 entity_key: A tuple (class_uri, shape_uri) 

977 

978 Returns: 

979 A list where each element is either a property URI string or a dictionary 

980 {'and': [list_of_property_uris]}, representing the boolean logic. 

981 Returns None if no configuration is found or if the structure is invalid. 

982 """ 

983 class_uri = entity_key[0] 

984 shape_uri = entity_key[1] 

985 

986 # Find the matching rule 

987 rule = find_matching_rule(class_uri, shape_uri) 

988 if not rule: 

989 return None 

990 

991 similarity_props = rule.get("similarity_properties") 

992 

993 if not similarity_props or not isinstance(similarity_props, list): 

994 return None 

995 

996 # Validate each element in the list. 

997 validated_props = [] 

998 for item in similarity_props: 

999 if isinstance(item, str): 

1000 validated_props.append(item) 

1001 elif isinstance(item, dict) and len(item) == 1 and "and" in item: 

1002 and_list = item["and"] 

1003 if ( 

1004 isinstance(and_list, list) 

1005 and and_list 

1006 and all(isinstance(p, str) for p in and_list) 

1007 ): 

1008 validated_props.append(item) 

1009 else: 

1010 logging.getLogger(__name__).warning( 

1011 "Invalid 'and' group in similarity_properties" 

1012 " for class %s. Expected" 

1013 " {'and': ['prop_uri', ...]} with" 

1014 " a non-empty list of strings.", 

1015 class_uri, 

1016 ) 

1017 return None 

1018 else: 

1019 logging.getLogger(__name__).warning( 

1020 "Invalid item format in similarity_properties" 

1021 " list for class %s. Expected a property URI" 

1022 " string or {'and': [...]} dict.", 

1023 class_uri, 

1024 ) 

1025 return None 

1026 

1027 return ( 

1028 validated_props or None 

1029 ) # Return validated list or None if empty after validation