Coverage for heritrace/utils/display_rules_utils.py: 100%

296 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-06-24 11:39 +0000

1from collections import OrderedDict 

2from typing import Dict, List, Optional, Tuple, Union 

3from urllib.parse import unquote 

4 

5from heritrace.extensions import (get_custom_filter, get_display_rules, 

6 get_form_fields, get_sparql) 

7from rdflib import ConjunctiveGraph, Graph, Literal, URIRef 

8from rdflib.plugins.sparql.algebra import translateQuery 

9from rdflib.plugins.sparql.parser import parseQuery 

10from SPARQLWrapper import JSON 

11 

12 

13display_rules = get_display_rules() 

14 

15 

16def find_matching_rule(class_uri=None, shape_uri=None, rules=None): 

17 """ 

18 Find the most appropriate rule for a given class and/or shape. 

19 At least one of class_uri or shape_uri must be provided. 

20  

21 Args: 

22 class_uri: Optional URI of the class 

23 shape_uri: Optional URI of the shape 

24 rules: Optional list of rules to search in, defaults to global display_rules 

25  

26 Returns: 

27 The matching rule or None if no match is found 

28 """ 

29 if not rules: 

30 rules = get_display_rules() 

31 if not rules: 

32 return None 

33 

34 # Initialize variables to track potential matches 

35 class_match = None 

36 shape_match = None 

37 highest_priority = float('inf') 

38 

39 # Scan all rules to find the best match based on priority 

40 for rule in rules: 

41 rule_priority = rule.get("priority", 0) 

42 

43 # Case 1: Both class and shape match (exact match) 

44 if class_uri and shape_uri and \ 

45 "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \ 

46 "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri): 

47 # Exact match always takes highest precedence 

48 return rule 

49 

50 # Case 2: Only class matches 

51 elif class_uri and "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \ 

52 "shape" not in rule["target"]: 

53 if class_match is None or rule_priority < highest_priority: 

54 class_match = rule 

55 highest_priority = rule_priority 

56 

57 # Case 3: Only shape matches 

58 elif shape_uri and "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri) and \ 

59 "class" not in rule["target"]: 

60 if shape_match is None or rule_priority < highest_priority: 

61 shape_match = rule 

62 highest_priority = rule_priority 

63 

64 # Return the best match based on priority 

65 # Shape rules typically have higher specificity, so prefer them if they have equal priority 

66 if shape_match and (class_match is None or 

67 shape_match.get("priority", 0) <= class_match.get("priority", 0)): 

68 return shape_match 

69 elif class_match: 

70 return class_match 

71 

72 return None 

73 

74 

75def get_class_priority(entity_key): 

76 """ 

77 Returns the priority of a specific entity key (class_uri, shape_uri). 

78 Calculates the priority directly from the display rules. 

79  

80 Args: 

81 entity_key: A tuple (class_uri, shape_uri) 

82 """ 

83 class_uri = entity_key[0] 

84 shape_uri = entity_key[1] 

85 

86 rule = find_matching_rule(class_uri, shape_uri) 

87 return rule.get("priority", 0) if rule else 0 

88 

89 

90def is_entity_type_visible(entity_key): 

91 """ 

92 Determines if an entity type should be displayed. 

93  

94 Args: 

95 entity_key: A tuple (class_uri, shape_uri) 

96 """ 

97 class_uri = entity_key[0] 

98 shape_uri = entity_key[1] 

99 

100 rule = find_matching_rule(class_uri, shape_uri) 

101 return rule.get("shouldBeDisplayed", True) if rule else True 

102 

103 

104def get_sortable_properties(entity_key: Tuple[str, str]) -> List[Dict[str, str]]: 

105 """ 

106 Gets the sortable properties from display rules for an entity type and/or shape. 

107 Infers the sorting type from form_fields_cache. 

108 

109 Args: 

110 entity_key: A tuple (class_uri, shape_uri) 

111 

112 Returns: 

113 List of dictionaries with sorting information 

114 """ 

115 display_rules = get_display_rules() 

116 if not display_rules: 

117 return [] 

118 

119 form_fields = get_form_fields() 

120 

121 class_uri = entity_key[0] 

122 shape_uri = entity_key[1] 

123 

124 rule = find_matching_rule(class_uri, shape_uri, display_rules) 

125 if not rule or "sortableBy" not in rule: 

126 return [] 

127 

128 sort_props = [] 

129 for sort_config in rule["sortableBy"]: 

130 prop = sort_config.copy() 

131 

132 for display_prop in rule["displayProperties"]: 

133 if display_prop["property"] == prop["property"]: 

134 if "displayRules" in display_prop: 

135 prop["displayName"] = display_prop["displayRules"][0][ 

136 "displayName" 

137 ] 

138 else: 

139 prop["displayName"] = display_prop.get( 

140 "displayName", prop["property"] 

141 ) 

142 break 

143 

144 # Default to string sorting 

145 prop["sortType"] = "string" 

146 

147 # Try to determine the sort type from form fields 

148 if form_fields: 

149 # First try with the exact entity_key (class, shape) 

150 if entity_key in form_fields and prop["property"] in form_fields[entity_key]: 

151 field_info = form_fields[entity_key][prop["property"]][0] # Take the first field definition 

152 prop["sortType"] = determine_sort_type(field_info) 

153 

154 sort_props.append(prop) 

155 

156 return sort_props 

157 

158 

159def determine_sort_type(field_info): 

160 """Helper function to determine sort type from field info.""" 

161 # If there's a shape, it's a reference to an entity (sort by label) 

162 if field_info.get("nodeShape"): 

163 return "string" 

164 # Otherwise look at the datatypes 

165 elif field_info.get("datatypes"): 

166 datatype = str(field_info["datatypes"][0]).lower() 

167 if any(t in datatype for t in ["date", "time"]): 

168 return "date" 

169 elif any( 

170 t in datatype 

171 for t in ["int", "float", "decimal", "double", "number"] 

172 ): 

173 return "number" 

174 elif "boolean" in datatype: 

175 return "boolean" 

176 # Default to string 

177 return "string" 

178 

179 

180def get_highest_priority_class(subject_classes): 

181 """ 

182 Find the highest priority class from the given list of classes. 

183  

184 Args: 

185 subject_classes: List of class URIs 

186  

187 Returns: 

188 The highest priority class or None if no classes are provided 

189 """ 

190 from heritrace.utils.shacl_utils import determine_shape_for_classes 

191 

192 if not subject_classes: 

193 return None 

194 

195 highest_priority = float('inf') 

196 highest_priority_class = None 

197 

198 for class_uri in subject_classes: 

199 class_uri = str(class_uri) 

200 shape = determine_shape_for_classes([class_uri]) 

201 entity_key = (class_uri, shape) 

202 priority = get_class_priority(entity_key) 

203 if priority < highest_priority: 

204 highest_priority = priority 

205 highest_priority_class = class_uri 

206 

207 return highest_priority_class 

208 

209 

210def get_grouped_triples( 

211 subject: URIRef, 

212 triples: List[Tuple[URIRef, URIRef, URIRef|Literal]], 

213 valid_predicates_info: List[str], 

214 historical_snapshot: Optional[Graph] = None, 

215 highest_priority_class: Optional[str] = None, 

216 highest_priority_shape: Optional[str] = None 

217) -> Tuple[OrderedDict, set, dict]: 

218 """ 

219 This function groups the triples based on the display rules.  

220 It also fetches the values for the properties that are configured to be fetched from the query. 

221  

222 Args: 

223 subject: The subject URI 

224 triples: List of triples for the subject 

225 valid_predicates_info: List of valid predicates for the subject 

226 historical_snapshot: Optional historical snapshot graph 

227 highest_priority_class: The highest priority class URI for the subject 

228  

229 Returns: 

230 Tuple of grouped triples, relevant properties, and fetched values map 

231 """ 

232 display_rules = get_display_rules() 

233 form_fields = get_form_fields() 

234 

235 grouped_triples = OrderedDict() 

236 relevant_properties = set() 

237 fetched_values_map = dict() # Map of original values to values returned by the query 

238 primary_properties = valid_predicates_info 

239 

240 matching_rule = find_matching_rule(highest_priority_class, highest_priority_shape, display_rules) 

241 matching_form_field = form_fields.get((highest_priority_class, highest_priority_shape)) 

242 

243 ordered_properties = [] 

244 if display_rules and matching_rule: 

245 for prop_config in matching_rule.get("displayProperties", []): 

246 if prop_config["property"] not in ordered_properties: 

247 ordered_properties.append(prop_config["property"]) 

248 

249 for prop_uri in primary_properties: 

250 if prop_uri not in ordered_properties: 

251 ordered_properties.append(prop_uri) 

252 

253 for prop_uri in ordered_properties: 

254 if display_rules and matching_rule: 

255 current_prop_config = None 

256 for prop_config in matching_rule.get("displayProperties", []): 

257 if prop_config["property"] == prop_uri: 

258 current_prop_config = prop_config 

259 break 

260 

261 current_form_field = matching_form_field.get(prop_uri) if matching_form_field else None 

262 

263 if current_prop_config: 

264 if "displayRules" in current_prop_config: 

265 is_ordered = "orderedBy" in current_prop_config 

266 order_property = current_prop_config.get("orderedBy") 

267 

268 for display_rule_nested in current_prop_config["displayRules"]: 

269 display_name_nested = display_rule_nested.get( 

270 "displayName", prop_uri 

271 ) 

272 relevant_properties.add(prop_uri) 

273 object_shape = display_rule_nested.get("shape") 

274 process_display_rule( 

275 display_name_nested, 

276 prop_uri, 

277 display_rule_nested, 

278 subject, 

279 triples, 

280 grouped_triples, 

281 fetched_values_map, 

282 historical_snapshot, 

283 highest_priority_shape, 

284 object_shape 

285 ) 

286 if is_ordered: 

287 grouped_triples[display_name_nested]["is_draggable"] = True 

288 grouped_triples[display_name_nested]["ordered_by"] = order_property 

289 process_ordering( 

290 subject, 

291 current_prop_config, 

292 order_property, 

293 grouped_triples, 

294 display_name_nested, 

295 fetched_values_map, 

296 historical_snapshot, 

297 ) 

298 

299 # Ensure the grouped_triples entry exists 

300 if display_name_nested not in grouped_triples: 

301 grouped_triples[display_name_nested] = { 

302 "property": prop_uri, 

303 "triples": [], 

304 "subjectShape": highest_priority_shape, 

305 "objectShape": display_rule_nested.get("shape") 

306 } 

307 

308 if "intermediateRelation" in display_rule_nested or "intermediateRelation" in current_prop_config: 

309 # Set intermediateRelation from the appropriate source 

310 if "intermediateRelation" in display_rule_nested: 

311 grouped_triples[display_name_nested]["intermediateRelation"] = display_rule_nested["intermediateRelation"] 

312 else: # Must be in current_prop_config based on the if condition 

313 grouped_triples[display_name_nested]["intermediateRelation"] = current_prop_config["intermediateRelation"] 

314 

315 else: 

316 display_name_simple = current_prop_config.get("displayName", prop_uri) 

317 relevant_properties.add(prop_uri) 

318 

319 object_shape = None 

320 if current_form_field: 

321 for form_field in current_form_field: 

322 object_shape = form_field.get("nodeShape") 

323 break 

324 

325 process_display_rule( 

326 display_name_simple, 

327 prop_uri, 

328 current_prop_config, 

329 subject, 

330 triples, 

331 grouped_triples, 

332 fetched_values_map, 

333 historical_snapshot, 

334 highest_priority_shape, 

335 object_shape 

336 ) 

337 if "orderedBy" in current_prop_config: 

338 if display_name_simple not in grouped_triples: 

339 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")} 

340 grouped_triples[display_name_simple]["is_draggable"] = True 

341 grouped_triples[display_name_simple]["ordered_by"] = current_prop_config.get("orderedBy") 

342 process_ordering( 

343 subject, 

344 current_prop_config, 

345 current_prop_config.get("orderedBy"), 

346 grouped_triples, 

347 display_name_simple, 

348 fetched_values_map, 

349 historical_snapshot, 

350 highest_priority_shape 

351 ) 

352 if "intermediateRelation" in current_prop_config: 

353 if display_name_simple not in grouped_triples: 

354 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")} 

355 grouped_triples[display_name_simple]["intermediateRelation"] = current_prop_config["intermediateRelation"] 

356 else: 

357 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape) 

358 else: 

359 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape) 

360 

361 grouped_triples = OrderedDict(grouped_triples) 

362 return grouped_triples, relevant_properties 

363 

364 

365def process_display_rule( 

366 display_name, 

367 prop_uri, 

368 rule, 

369 subject, 

370 triples, 

371 grouped_triples, 

372 fetched_values_map, 

373 historical_snapshot=None, 

374 subject_shape=None, 

375 object_shape=None, 

376): 

377 if display_name not in grouped_triples: 

378 grouped_triples[display_name] = { 

379 "property": prop_uri, 

380 "triples": [], 

381 "subjectShape": subject_shape, 

382 "objectShape": object_shape, 

383 "intermediateRelation": rule.get("intermediateRelation"), 

384 } 

385 for triple in triples: 

386 if str(triple[1]) == prop_uri: 

387 if rule.get("fetchValueFromQuery"): 

388 if historical_snapshot: 

389 result, external_entity = execute_historical_query( 

390 rule["fetchValueFromQuery"], 

391 subject, 

392 triple[2], 

393 historical_snapshot, 

394 ) 

395 else: 

396 result, external_entity = execute_sparql_query( 

397 rule["fetchValueFromQuery"], subject, triple[2] 

398 ) 

399 if result: 

400 fetched_values_map[str(result)] = str(triple[2]) 

401 new_triple = (str(triple[0]), str(triple[1]), str(result)) 

402 object_uri = str(triple[2]) 

403 new_triple_data = { 

404 "triple": new_triple, 

405 "external_entity": external_entity, 

406 "object": object_uri, 

407 "subjectShape": subject_shape, 

408 "objectShape": object_shape, 

409 } 

410 grouped_triples[display_name]["triples"].append(new_triple_data) 

411 else: 

412 if str(triple[1]) == 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type': 

413 from heritrace.utils.shacl_utils import determine_shape_for_classes 

414 object_class_shape = determine_shape_for_classes([triple[2]]) 

415 result = get_custom_filter().human_readable_class((triple[2], object_class_shape)) 

416 else: 

417 result = triple[2] 

418 

419 object_uri = str(triple[2]) 

420 

421 new_triple_data = { 

422 "triple": (str(triple[0]), str(triple[1]), result), 

423 "object": object_uri, 

424 "subjectShape": subject_shape, 

425 "objectShape": object_shape, 

426 } 

427 grouped_triples[display_name]["triples"].append(new_triple_data) 

428 

429 

430def execute_sparql_query(query: str, subject: str, value: str) -> Tuple[str, str]: 

431 sparql = get_sparql() 

432 

433 decoded_subject = unquote(subject) 

434 decoded_value = unquote(value) 

435 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

436 query = query.replace("[[value]]", f"<{decoded_value}>") 

437 sparql.setQuery(query) 

438 sparql.setReturnFormat(JSON) 

439 results = sparql.query().convert().get("results", {}).get("bindings", []) 

440 if results: 

441 parsed_query = parseQuery(query) 

442 algebra_query = translateQuery(parsed_query).algebra 

443 variable_order = algebra_query["PV"] 

444 result = results[0] 

445 values = [ 

446 result.get(str(var_name), {}).get("value", None) 

447 for var_name in variable_order 

448 ] 

449 first_value = values[0] if len(values) > 0 else None 

450 second_value = values[1] if len(values) > 1 else None 

451 return (first_value, second_value) 

452 return None, None 

453 

454 

455def process_ordering( 

456 subject, 

457 prop, 

458 order_property, 

459 grouped_triples, 

460 display_name, 

461 fetched_values_map, 

462 historical_snapshot: ConjunctiveGraph | Graph | None = None, 

463): 

464 def get_ordered_sequence(order_results): 

465 order_map = {} 

466 for res in order_results: 

467 if isinstance(res, dict): # For live triplestore results 

468 ordered_entity = res["orderedEntity"]["value"] 

469 next_value = res["nextValue"]["value"] 

470 else: # For historical snapshot results 

471 ordered_entity = str(res[0]) 

472 next_value = str(res[1]) 

473 

474 order_map[str(ordered_entity)] = ( 

475 None if str(next_value) == "NONE" else str(next_value) 

476 ) 

477 

478 all_sequences = [] 

479 start_elements = set(order_map.keys()) - set(order_map.values()) 

480 while start_elements: 

481 sequence = [] 

482 current_element = start_elements.pop() 

483 while current_element in order_map: 

484 sequence.append(current_element) 

485 current_element = order_map[current_element] 

486 all_sequences.append(sequence) 

487 return all_sequences 

488 

489 decoded_subject = unquote(subject) 

490 

491 sparql = get_sparql() 

492 

493 order_query = f""" 

494 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue) 

495 WHERE {{ 

496 <{decoded_subject}> <{prop['property']}> ?orderedEntity. 

497 OPTIONAL {{ 

498 ?orderedEntity <{order_property}> ?next. 

499 }} 

500 }} 

501 """ 

502 if historical_snapshot: 

503 order_results = list(historical_snapshot.query(order_query)) 

504 else: 

505 sparql.setQuery(order_query) 

506 sparql.setReturnFormat(JSON) 

507 order_results = sparql.query().convert().get("results", {}).get("bindings", []) 

508 

509 order_sequences = get_ordered_sequence(order_results) 

510 for sequence in order_sequences: 

511 grouped_triples[display_name]["triples"].sort( 

512 key=lambda x: ( 

513 sequence.index( 

514 fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

515 ) 

516 if fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

517 in sequence 

518 else float("inf") 

519 ) 

520 ) 

521 

522 

523def process_default_property(prop_uri, triples, grouped_triples, subject_shape=None): 

524 display_name = prop_uri 

525 grouped_triples[display_name] = { 

526 "property": prop_uri, 

527 "triples": [], 

528 "subjectShape": subject_shape, 

529 "objectShape": None 

530 } 

531 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri] 

532 for triple in triples_for_prop: 

533 new_triple_data = { 

534 "triple": (str(triple[0]), str(triple[1]), str(triple[2])), 

535 "object": str(triple[2]), 

536 "subjectShape": subject_shape, 

537 "objectShape": None, 

538 } 

539 grouped_triples[display_name]["triples"].append(new_triple_data) 

540 

541 

542def execute_historical_query( 

543 query: str, subject: str, value: str, historical_snapshot: Graph 

544) -> Tuple[str, str]: 

545 decoded_subject = unquote(subject) 

546 decoded_value = unquote(value) 

547 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

548 query = query.replace("[[value]]", f"<{decoded_value}>") 

549 results = historical_snapshot.query(query) 

550 if results: 

551 for result in results: 

552 return (str(result[0]), str(result[1])) 

553 return None, None 

554 

555 

556def get_property_order_from_rules(highest_priority_class: str, shape_uri: str = None): 

557 """ 

558 Extract ordered list of properties from display rules for given entity class and optionally a shape. 

559 

560 Args: 

561 highest_priority_class: The highest priority class for the entity 

562 shape_uri: Optional shape URI for the entity 

563 

564 Returns: 

565 List of property URIs in the order specified by display rules 

566 """ 

567 display_rules = get_display_rules() 

568 if not display_rules: 

569 return [] 

570 

571 ordered_properties = [] 

572 

573 if not highest_priority_class: 

574 return [] 

575 

576 # If we have a shape, try to find a rule matching both class and shape 

577 if shape_uri: 

578 rule = find_matching_rule(highest_priority_class, shape_uri, display_rules) 

579 if rule: 

580 # Extract properties in order from displayProperties 

581 for prop in rule.get("displayProperties", []): 

582 if isinstance(prop, dict) and "property" in prop: 

583 ordered_properties.append(prop["property"]) 

584 return ordered_properties 

585 

586 # If no match with shape or no shape provided, find a rule matching just the class 

587 rule = find_matching_rule(highest_priority_class, None, display_rules) 

588 if rule: 

589 # Extract properties in order from displayProperties 

590 for prop in rule.get("displayProperties", []): 

591 if isinstance(prop, dict) and "property" in prop: 

592 ordered_properties.append(prop["property"]) 

593 

594 return ordered_properties 

595 

596 

597def get_similarity_properties(entity_key: Tuple[str, str]) -> Optional[List[Union[str, Dict[str, List[str]]]]]: 

598 """Gets the similarity properties configuration for a given entity key. 

599 

600 This configuration specifies which properties should be used for similarity matching 

601 using a list-based structure supporting OR logic between elements and 

602 nested AND logic within elements. 

603 

604 Example structures: 

605 - ['prop1', 'prop2'] # prop1 OR prop2 

606 - [{'and': ['prop3', 'prop4']}] # prop3 AND prop4 

607 - ['prop1', {'and': ['prop2', 'prop3']}] # prop1 OR (prop2 AND prop3) 

608 

609 Args: 

610 entity_key: A tuple (class_uri, shape_uri) 

611 

612 Returns: 

613 A list where each element is either a property URI string or a dictionary 

614 {'and': [list_of_property_uris]}, representing the boolean logic. 

615 Returns None if no configuration is found or if the structure is invalid. 

616 """ 

617 class_uri = entity_key[0] 

618 shape_uri = entity_key[1] 

619 

620 # Find the matching rule 

621 rule = find_matching_rule(class_uri, shape_uri) 

622 if not rule: 

623 return None 

624 

625 similarity_props = rule.get("similarity_properties") 

626 

627 if not similarity_props or not isinstance(similarity_props, list): 

628 print(f"Warning: Invalid format for similarity_properties in class {class_uri}") 

629 return None 

630 

631 # Validate each element in the list. 

632 validated_props = [] 

633 for item in similarity_props: 

634 if isinstance(item, str): 

635 validated_props.append(item) 

636 elif isinstance(item, dict) and len(item) == 1 and "and" in item: 

637 and_list = item["and"] 

638 if isinstance(and_list, list) and and_list and all(isinstance(p, str) for p in and_list): 

639 validated_props.append(item) 

640 else: 

641 print( 

642 f"Warning: Invalid 'and' group in similarity_properties" + 

643 (f" for class {class_uri}" if class_uri else "") + 

644 (f" with shape {shape_uri}" if shape_uri else "") + 

645 f". Expected {{'and': ['prop_uri', ...]}} with a non-empty list of strings." 

646 ) 

647 return None # Invalid 'and' group structure 

648 else: 

649 print( 

650 f"Warning: Invalid item format in similarity_properties list" + 

651 (f" for class {class_uri}" if class_uri else "") + 

652 (f" with shape {shape_uri}" if shape_uri else "") + 

653 f". Expected a property URI string or {{'and': [...]}} dict." 

654 ) 

655 return None # Invalid item type 

656 

657 return validated_props if validated_props else None # Return validated list or None if empty after validation