Coverage for heritrace/utils/display_rules_utils.py: 96%

313 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-08-01 22:12 +0000

1from collections import OrderedDict 

2from typing import Dict, List, Optional, Tuple, Union 

3from urllib.parse import unquote 

4 

5from heritrace.extensions import (get_custom_filter, get_display_rules, 

6 get_form_fields, get_sparql) 

7from rdflib import ConjunctiveGraph, Graph, Literal, URIRef 

8from rdflib.plugins.sparql.algebra import translateQuery 

9from rdflib.plugins.sparql.parser import parseQuery 

10from SPARQLWrapper import JSON 

11 

12 

13display_rules = get_display_rules() 

14 

15 

16def find_matching_rule(class_uri=None, shape_uri=None, rules=None): 

17 """ 

18 Find the most appropriate rule for a given class and/or shape. 

19 At least one of class_uri or shape_uri must be provided. 

20  

21 Args: 

22 class_uri: Optional URI of the class 

23 shape_uri: Optional URI of the shape 

24 rules: Optional list of rules to search in, defaults to global display_rules 

25  

26 Returns: 

27 The matching rule or None if no match is found 

28 """ 

29 if not rules: 

30 rules = get_display_rules() 

31 if not rules: 

32 return None 

33 

34 # Initialize variables to track potential matches 

35 class_match = None 

36 shape_match = None 

37 highest_priority = float('inf') 

38 

39 # Scan all rules to find the best match based on priority 

40 for rule in rules: 

41 rule_priority = rule.get("priority", 0) 

42 

43 # Case 1: Both class and shape match (exact match) 

44 if class_uri and shape_uri and \ 

45 "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \ 

46 "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri): 

47 # Exact match always takes highest precedence 

48 return rule 

49 

50 # Case 2: Only class matches 

51 elif class_uri and "class" in rule["target"] and rule["target"]["class"] == str(class_uri) and \ 

52 "shape" not in rule["target"]: 

53 if class_match is None or rule_priority < highest_priority: 

54 class_match = rule 

55 highest_priority = rule_priority 

56 

57 # Case 3: Only shape matches 

58 elif shape_uri and "shape" in rule["target"] and rule["target"]["shape"] == str(shape_uri) and \ 

59 "class" not in rule["target"]: 

60 if shape_match is None or rule_priority < highest_priority: 

61 shape_match = rule 

62 highest_priority = rule_priority 

63 

64 # Return the best match based on priority 

65 # Shape rules typically have higher specificity, so prefer them if they have equal priority 

66 if shape_match and (class_match is None or 

67 shape_match.get("priority", 0) <= class_match.get("priority", 0)): 

68 return shape_match 

69 elif class_match: 

70 return class_match 

71 

72 return None 

73 

74 

75def get_class_priority(entity_key): 

76 """ 

77 Returns the priority of a specific entity key (class_uri, shape_uri). 

78 Calculates the priority directly from the display rules. 

79 Classes without defined rules receive the lowest priority (highest number). 

80  

81 Args: 

82 entity_key: A tuple (class_uri, shape_uri) 

83 """ 

84 class_uri = entity_key[0] 

85 shape_uri = entity_key[1] 

86 

87 rule = find_matching_rule(class_uri, shape_uri) 

88 return rule.get("priority", 0) if rule else float('inf') 

89 

90 

91def is_entity_type_visible(entity_key): 

92 """ 

93 Determines if an entity type should be displayed. 

94  

95 Args: 

96 entity_key: A tuple (class_uri, shape_uri) 

97 """ 

98 class_uri = entity_key[0] 

99 shape_uri = entity_key[1] 

100 

101 rule = find_matching_rule(class_uri, shape_uri) 

102 return rule.get("shouldBeDisplayed", True) if rule else True 

103 

104 

105def get_sortable_properties(entity_key: Tuple[str, str]) -> List[Dict[str, str]]: 

106 """ 

107 Gets the sortable properties from display rules for an entity type and/or shape. 

108 Infers the sorting type from form_fields_cache. 

109 

110 Args: 

111 entity_key: A tuple (class_uri, shape_uri) 

112 

113 Returns: 

114 List of dictionaries with sorting information 

115 """ 

116 display_rules = get_display_rules() 

117 if not display_rules: 

118 return [] 

119 

120 form_fields = get_form_fields() 

121 

122 class_uri = entity_key[0] 

123 shape_uri = entity_key[1] 

124 

125 rule = find_matching_rule(class_uri, shape_uri, display_rules) 

126 if not rule or "sortableBy" not in rule: 

127 return [] 

128 

129 sort_props = [] 

130 for sort_config in rule["sortableBy"]: 

131 prop = sort_config.copy() 

132 

133 for display_prop in rule["displayProperties"]: 

134 if display_prop["property"] == prop["property"]: 

135 if "displayRules" in display_prop: 

136 prop["displayName"] = display_prop["displayRules"][0][ 

137 "displayName" 

138 ] 

139 else: 

140 prop["displayName"] = display_prop.get( 

141 "displayName", prop["property"] 

142 ) 

143 break 

144 

145 # Default to string sorting 

146 prop["sortType"] = "string" 

147 

148 # Try to determine the sort type from form fields 

149 if form_fields: 

150 # First try with the exact entity_key (class, shape) 

151 if entity_key in form_fields and prop["property"] in form_fields[entity_key]: 

152 field_info = form_fields[entity_key][prop["property"]][0] # Take the first field definition 

153 prop["sortType"] = determine_sort_type(field_info) 

154 

155 sort_props.append(prop) 

156 

157 return sort_props 

158 

159 

160def determine_sort_type(field_info): 

161 """Helper function to determine sort type from field info.""" 

162 # If there's a shape, it's a reference to an entity (sort by label) 

163 if field_info.get("nodeShape"): 

164 return "string" 

165 # Otherwise look at the datatypes 

166 elif field_info.get("datatypes"): 

167 datatype = str(field_info["datatypes"][0]).lower() 

168 if any(t in datatype for t in ["date", "time"]): 

169 return "date" 

170 elif any( 

171 t in datatype 

172 for t in ["int", "float", "decimal", "double", "number"] 

173 ): 

174 return "number" 

175 elif "boolean" in datatype: 

176 return "boolean" 

177 # Default to string 

178 return "string" 

179 

180 

181def get_highest_priority_class(subject_classes): 

182 """ 

183 Find the highest priority class from the given list of classes. 

184  

185 Args: 

186 subject_classes: List of class URIs 

187  

188 Returns: 

189 The highest priority class or None if no classes are provided 

190 """ 

191 from heritrace.utils.shacl_utils import determine_shape_for_classes 

192 

193 if not subject_classes: 

194 return None 

195 

196 highest_priority = float('inf') 

197 highest_priority_class = None 

198 

199 for class_uri in subject_classes: 

200 class_uri = str(class_uri) 

201 shape = determine_shape_for_classes([class_uri]) 

202 entity_key = (class_uri, shape) 

203 priority = get_class_priority(entity_key) 

204 if priority < highest_priority: 

205 highest_priority = priority 

206 highest_priority_class = class_uri 

207 

208 if highest_priority_class is None and subject_classes: 

209 highest_priority_class = str(subject_classes[0]) 

210 

211 return highest_priority_class 

212 

213 

214def get_grouped_triples( 

215 subject: URIRef, 

216 triples: List[Tuple[URIRef, URIRef, URIRef|Literal]], 

217 valid_predicates_info: List[str], 

218 historical_snapshot: Optional[Graph] = None, 

219 highest_priority_class: Optional[str] = None, 

220 highest_priority_shape: Optional[str] = None 

221) -> Tuple[OrderedDict, set, dict]: 

222 """ 

223 This function groups the triples based on the display rules.  

224 It also fetches the values for the properties that are configured to be fetched from the query. 

225  

226 Args: 

227 subject: The subject URI 

228 triples: List of triples for the subject 

229 valid_predicates_info: List of valid predicates for the subject 

230 historical_snapshot: Optional historical snapshot graph 

231 highest_priority_class: The highest priority class URI for the subject 

232 highest_priority_shape: The highest priority shape URI for the subject 

233  

234 Returns: 

235 Tuple of grouped triples, relevant properties, and fetched values map 

236  

237 Note: 

238 relevant_properties contains all properties that should be considered  

239 "relevant" for UI operations (adding/deleting). This includes: 

240 - Properties configured in display rules when rules exist and match 

241 - ALL valid properties when no display rules exist or no rules match 

242 This ensures users can always interact with entities even without display rules. 

243 """ 

244 display_rules = get_display_rules() 

245 form_fields = get_form_fields() 

246 

247 grouped_triples = OrderedDict() 

248 relevant_properties = set() 

249 fetched_values_map = dict() # Map of original values to values returned by the query 

250 primary_properties = valid_predicates_info 

251 

252 matching_rule = find_matching_rule(highest_priority_class, highest_priority_shape, display_rules) 

253 matching_form_field = form_fields.get((highest_priority_class, highest_priority_shape)) 

254 ordered_properties = [] 

255 if display_rules and matching_rule: 

256 for prop_config in matching_rule.get("displayProperties", []): 

257 if prop_config["property"] not in ordered_properties: 

258 ordered_properties.append(prop_config["property"]) 

259 

260 for prop_uri in primary_properties: 

261 if prop_uri not in ordered_properties: 

262 ordered_properties.append(prop_uri) 

263 

264 for prop_uri in ordered_properties: 

265 if display_rules and matching_rule: 

266 current_prop_config = None 

267 for prop_config in matching_rule.get("displayProperties", []): 

268 if prop_config["property"] == prop_uri: 

269 current_prop_config = prop_config 

270 break 

271 

272 current_form_field = matching_form_field.get(prop_uri) if matching_form_field else None 

273 

274 if current_prop_config: 

275 if "displayRules" in current_prop_config: 

276 is_ordered = "orderedBy" in current_prop_config 

277 order_property = current_prop_config.get("orderedBy") 

278 

279 for display_rule_nested in current_prop_config["displayRules"]: 

280 display_name_nested = display_rule_nested.get( 

281 "displayName", prop_uri 

282 ) 

283 relevant_properties.add(prop_uri) 

284 object_shape = display_rule_nested.get("shape") 

285 process_display_rule( 

286 display_name_nested, 

287 prop_uri, 

288 display_rule_nested, 

289 subject, 

290 triples, 

291 grouped_triples, 

292 fetched_values_map, 

293 historical_snapshot, 

294 highest_priority_shape, 

295 object_shape, 

296 highest_priority_class 

297 ) 

298 if is_ordered: 

299 grouped_triples[display_name_nested]["is_draggable"] = True 

300 grouped_triples[display_name_nested]["ordered_by"] = order_property 

301 process_ordering( 

302 subject, 

303 current_prop_config, 

304 order_property, 

305 grouped_triples, 

306 display_name_nested, 

307 fetched_values_map, 

308 historical_snapshot, 

309 ) 

310 

311 # Ensure the grouped_triples entry exists 

312 if display_name_nested not in grouped_triples: 

313 grouped_triples[display_name_nested] = { 

314 "property": prop_uri, 

315 "triples": [], 

316 "subjectClass": highest_priority_class, 

317 "subjectShape": highest_priority_shape, 

318 "objectShape": display_rule_nested.get("shape") 

319 } 

320 

321 if "intermediateRelation" in display_rule_nested or "intermediateRelation" in current_prop_config: 

322 # Set intermediateRelation from the appropriate source 

323 if "intermediateRelation" in display_rule_nested: 

324 grouped_triples[display_name_nested]["intermediateRelation"] = display_rule_nested["intermediateRelation"] 

325 else: # Must be in current_prop_config based on the if condition 

326 grouped_triples[display_name_nested]["intermediateRelation"] = current_prop_config["intermediateRelation"] 

327 

328 else: 

329 display_name_simple = current_prop_config.get("displayName", prop_uri) 

330 relevant_properties.add(prop_uri) 

331 

332 object_shape = None 

333 if current_form_field: 

334 for form_field in current_form_field: 

335 object_shape = form_field.get("nodeShape") 

336 break 

337 

338 process_display_rule( 

339 display_name_simple, 

340 prop_uri, 

341 current_prop_config, 

342 subject, 

343 triples, 

344 grouped_triples, 

345 fetched_values_map, 

346 historical_snapshot, 

347 highest_priority_shape, 

348 object_shape, 

349 highest_priority_class 

350 ) 

351 if "orderedBy" in current_prop_config: 

352 if display_name_simple not in grouped_triples: 

353 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")} 

354 grouped_triples[display_name_simple]["is_draggable"] = True 

355 grouped_triples[display_name_simple]["ordered_by"] = current_prop_config.get("orderedBy") 

356 process_ordering( 

357 subject, 

358 current_prop_config, 

359 current_prop_config.get("orderedBy"), 

360 grouped_triples, 

361 display_name_simple, 

362 fetched_values_map, 

363 historical_snapshot, 

364 highest_priority_shape 

365 ) 

366 if "intermediateRelation" in current_prop_config: 

367 if display_name_simple not in grouped_triples: 

368 grouped_triples[display_name_simple] = {"property": prop_uri, "triples": [], "subjectClass": highest_priority_class, "subjectShape": highest_priority_shape, "objectShape": current_prop_config.get("shape")} 

369 grouped_triples[display_name_simple]["intermediateRelation"] = current_prop_config["intermediateRelation"] 

370 else: 

371 # Property without specific configuration - add to relevant_properties 

372 relevant_properties.add(prop_uri) 

373 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class) 

374 else: 

375 # No display rules or no matching rule - add all properties to relevant_properties 

376 relevant_properties.add(prop_uri) 

377 process_default_property(prop_uri, triples, grouped_triples, highest_priority_shape, highest_priority_class) 

378 

379 grouped_triples = OrderedDict(grouped_triples) 

380 return grouped_triples, relevant_properties 

381 

382 

383def process_display_rule( 

384 display_name, 

385 prop_uri, 

386 rule, 

387 subject, 

388 triples, 

389 grouped_triples, 

390 fetched_values_map, 

391 historical_snapshot=None, 

392 subject_shape=None, 

393 object_shape=None, 

394 subject_class=None, 

395): 

396 if display_name not in grouped_triples: 

397 grouped_triples[display_name] = { 

398 "property": prop_uri, 

399 "triples": [], 

400 "subjectClass": subject_class, 

401 "subjectShape": subject_shape, 

402 "objectShape": object_shape, 

403 "intermediateRelation": rule.get("intermediateRelation"), 

404 } 

405 for triple in triples: 

406 if str(triple[1]) == prop_uri: 

407 if rule.get("fetchValueFromQuery"): 

408 if historical_snapshot: 

409 result, external_entity = execute_historical_query( 

410 rule["fetchValueFromQuery"], 

411 subject, 

412 triple[2], 

413 historical_snapshot, 

414 ) 

415 else: 

416 result, external_entity = execute_sparql_query( 

417 rule["fetchValueFromQuery"], subject, triple[2] 

418 ) 

419 if result: 

420 fetched_values_map[str(result)] = str(triple[2]) 

421 new_triple = (str(triple[0]), str(triple[1]), str(result)) 

422 object_uri = str(triple[2]) 

423 new_triple_data = { 

424 "triple": new_triple, 

425 "external_entity": external_entity, 

426 "object": object_uri, 

427 "subjectClass": subject_class, 

428 "subjectShape": subject_shape, 

429 "objectShape": object_shape, 

430 } 

431 grouped_triples[display_name]["triples"].append(new_triple_data) 

432 else: 

433 if str(triple[1]) == 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type': 

434 from heritrace.utils.shacl_utils import determine_shape_for_classes 

435 object_class_shape = determine_shape_for_classes([triple[2]]) 

436 result = get_custom_filter().human_readable_class((triple[2], object_class_shape)) 

437 else: 

438 result = triple[2] 

439 

440 object_uri = str(triple[2]) 

441 

442 new_triple_data = { 

443 "triple": (str(triple[0]), str(triple[1]), result), 

444 "object": object_uri, 

445 "subjectClass": subject_class, 

446 "subjectShape": subject_shape, 

447 "objectShape": object_shape, 

448 } 

449 grouped_triples[display_name]["triples"].append(new_triple_data) 

450 

451 

452def execute_sparql_query(query: str, subject: str, value: str) -> Tuple[str, str]: 

453 sparql = get_sparql() 

454 

455 decoded_subject = unquote(subject) 

456 decoded_value = unquote(value) 

457 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

458 query = query.replace("[[value]]", f"<{decoded_value}>") 

459 sparql.setQuery(query) 

460 sparql.setReturnFormat(JSON) 

461 results = sparql.query().convert().get("results", {}).get("bindings", []) 

462 if results: 

463 parsed_query = parseQuery(query) 

464 algebra_query = translateQuery(parsed_query).algebra 

465 variable_order = algebra_query["PV"] 

466 result = results[0] 

467 values = [ 

468 result.get(str(var_name), {}).get("value", None) 

469 for var_name in variable_order 

470 ] 

471 first_value = values[0] if len(values) > 0 else None 

472 second_value = values[1] if len(values) > 1 else None 

473 return (first_value, second_value) 

474 return None, None 

475 

476 

477def process_ordering( 

478 subject, 

479 prop, 

480 order_property, 

481 grouped_triples, 

482 display_name, 

483 fetched_values_map, 

484 historical_snapshot: ConjunctiveGraph | Graph | None = None, 

485): 

486 def get_ordered_sequence(order_results): 

487 order_map = {} 

488 for res in order_results: 

489 if isinstance(res, dict): # For live triplestore results 

490 ordered_entity = res["orderedEntity"]["value"] 

491 next_value = res["nextValue"]["value"] 

492 else: # For historical snapshot results 

493 ordered_entity = str(res[0]) 

494 next_value = str(res[1]) 

495 

496 order_map[str(ordered_entity)] = ( 

497 None if str(next_value) == "NONE" else str(next_value) 

498 ) 

499 

500 all_sequences = [] 

501 start_elements = set(order_map.keys()) - set(order_map.values()) 

502 while start_elements: 

503 sequence = [] 

504 current_element = start_elements.pop() 

505 while current_element in order_map: 

506 sequence.append(current_element) 

507 current_element = order_map[current_element] 

508 all_sequences.append(sequence) 

509 return all_sequences 

510 

511 decoded_subject = unquote(subject) 

512 

513 sparql = get_sparql() 

514 

515 order_query = f""" 

516 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue) 

517 WHERE {{ 

518 <{decoded_subject}> <{prop['property']}> ?orderedEntity. 

519 OPTIONAL {{ 

520 ?orderedEntity <{order_property}> ?next. 

521 }} 

522 }} 

523 """ 

524 if historical_snapshot: 

525 order_results = list(historical_snapshot.query(order_query)) 

526 else: 

527 sparql.setQuery(order_query) 

528 sparql.setReturnFormat(JSON) 

529 order_results = sparql.query().convert().get("results", {}).get("bindings", []) 

530 

531 order_sequences = get_ordered_sequence(order_results) 

532 for sequence in order_sequences: 

533 grouped_triples[display_name]["triples"].sort( 

534 key=lambda x: ( 

535 sequence.index( 

536 fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

537 ) 

538 if fetched_values_map.get(str(x["triple"][2]), str(x["triple"][2])) 

539 in sequence 

540 else float("inf") 

541 ) 

542 ) 

543 

544 

545def process_default_property(prop_uri, triples, grouped_triples, subject_shape=None, subject_class=None): 

546 display_name = prop_uri 

547 grouped_triples[display_name] = { 

548 "property": prop_uri, 

549 "triples": [], 

550 "subjectClass": subject_class, 

551 "subjectShape": subject_shape, 

552 "objectShape": None 

553 } 

554 triples_for_prop = [triple for triple in triples if str(triple[1]) == prop_uri] 

555 for triple in triples_for_prop: 

556 new_triple_data = { 

557 "triple": (str(triple[0]), str(triple[1]), str(triple[2])), 

558 "object": str(triple[2]), 

559 "subjectClass": subject_class, 

560 "subjectShape": subject_shape, 

561 "objectShape": None, 

562 } 

563 grouped_triples[display_name]["triples"].append(new_triple_data) 

564 

565 

566def execute_historical_query( 

567 query: str, subject: str, value: str, historical_snapshot: Graph 

568) -> Tuple[str, str]: 

569 decoded_subject = unquote(subject) 

570 decoded_value = unquote(value) 

571 query = query.replace("[[subject]]", f"<{decoded_subject}>") 

572 query = query.replace("[[value]]", f"<{decoded_value}>") 

573 results = historical_snapshot.query(query) 

574 if results: 

575 for result in results: 

576 return (str(result[0]), str(result[1])) 

577 return None, None 

578 

579 

580def get_property_order_from_rules(highest_priority_class: str, shape_uri: str = None): 

581 """ 

582 Extract ordered list of properties from display rules for given entity class and optionally a shape. 

583 

584 Args: 

585 highest_priority_class: The highest priority class for the entity 

586 shape_uri: Optional shape URI for the entity 

587 

588 Returns: 

589 List of property URIs in the order specified by display rules 

590 """ 

591 if not highest_priority_class: 

592 return [] 

593 

594 rule = find_matching_rule(highest_priority_class, shape_uri) 

595 if not rule: 

596 return [] 

597 

598 ordered_properties = [] 

599 for prop in rule.get("displayProperties", []): 

600 if isinstance(prop, dict) and "property" in prop: 

601 ordered_properties.append(prop["property"]) 

602 

603 return ordered_properties 

604 

605 

606def get_predicate_ordering_info(predicate_uri: str, highest_priority_class: str, entity_shape: str = None) -> Optional[str]: 

607 """ 

608 Check if a predicate is ordered and return its ordering property. 

609  

610 Args: 

611 predicate_uri: URI of the predicate to check 

612 highest_priority_class: The highest priority class for the subject entity 

613 entity_shape: Optional shape for the subject entity 

614  

615 Returns: 

616 The ordering property URI if the predicate is ordered, None otherwise 

617 """ 

618 display_rules = get_display_rules() 

619 if not display_rules: 

620 return None 

621 

622 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules) 

623 if not rule: 

624 return None 

625 

626 for prop in rule.get("displayProperties", []): 

627 if isinstance(prop, dict) and prop.get("property") == predicate_uri: 

628 return prop.get("orderedBy") 

629 

630 return None 

631 

632 

633def get_shape_order_from_display_rules(highest_priority_class: str, entity_shape: str, predicate_uri: str) -> list: 

634 """ 

635 Get the ordered list of shapes for a specific predicate from display rules. 

636  

637 Args: 

638 highest_priority_class: The highest priority class for the entity 

639 entity_shape: The shape for the subject entity 

640 predicate_uri: The predicate URI to get shape ordering for 

641  

642 Returns: 

643 List of shape URIs in the order specified in displayRules, or empty list if no rules found 

644 """ 

645 display_rules = get_display_rules() 

646 if not display_rules: 

647 return [] 

648 

649 rule = find_matching_rule(highest_priority_class, entity_shape, display_rules) 

650 if not rule or "displayProperties" not in rule: 

651 return [] 

652 

653 for prop_config in rule["displayProperties"]: 

654 if prop_config["property"] == predicate_uri: 

655 if "displayRules" in prop_config: 

656 return [display_rule.get("shape") for display_rule in prop_config["displayRules"] 

657 if display_rule.get("shape")] 

658 

659 return [] 

660 

661 

662def get_similarity_properties(entity_key: Tuple[str, str]) -> Optional[List[Union[str, Dict[str, List[str]]]]]: 

663 """Gets the similarity properties configuration for a given entity key. 

664 

665 This configuration specifies which properties should be used for similarity matching 

666 using a list-based structure supporting OR logic between elements and 

667 nested AND logic within elements. 

668 

669 Example structures: 

670 - ['prop1', 'prop2'] # prop1 OR prop2 

671 - [{'and': ['prop3', 'prop4']}] # prop3 AND prop4 

672 - ['prop1', {'and': ['prop2', 'prop3']}] # prop1 OR (prop2 AND prop3) 

673 

674 Args: 

675 entity_key: A tuple (class_uri, shape_uri) 

676 

677 Returns: 

678 A list where each element is either a property URI string or a dictionary 

679 {'and': [list_of_property_uris]}, representing the boolean logic. 

680 Returns None if no configuration is found or if the structure is invalid. 

681 """ 

682 class_uri = entity_key[0] 

683 shape_uri = entity_key[1] 

684 

685 # Find the matching rule 

686 rule = find_matching_rule(class_uri, shape_uri) 

687 if not rule: 

688 return None 

689 

690 similarity_props = rule.get("similarity_properties") 

691 

692 if not similarity_props or not isinstance(similarity_props, list): 

693 return None 

694 

695 # Validate each element in the list. 

696 validated_props = [] 

697 for item in similarity_props: 

698 if isinstance(item, str): 

699 validated_props.append(item) 

700 elif isinstance(item, dict) and len(item) == 1 and "and" in item: 

701 and_list = item["and"] 

702 if isinstance(and_list, list) and and_list and all(isinstance(p, str) for p in and_list): 

703 validated_props.append(item) 

704 else: 

705 print( 

706 f"Warning: Invalid 'and' group in similarity_properties" + 

707 (f" for class {class_uri}" if class_uri else "") + 

708 (f" with shape {shape_uri}" if shape_uri else "") + 

709 f". Expected {{'and': ['prop_uri', ...]}} with a non-empty list of strings." 

710 ) 

711 return None # Invalid 'and' group structure 

712 else: 

713 print( 

714 f"Warning: Invalid item format in similarity_properties list" + 

715 (f" for class {class_uri}" if class_uri else "") + 

716 (f" with shape {shape_uri}" if shape_uri else "") + 

717 f". Expected a property URI string or {{'and': [...]}} dict." 

718 ) 

719 return None # Invalid item type 

720 

721 return validated_props if validated_props else None # Return validated list or None if empty after validation