Coverage for heritrace / utils / shacl_utils.py: 96%

221 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from collections.abc import Iterable 

6from weakref import WeakKeyDictionary 

7 

8from flask import Flask 

9from rdflib import RDF, Graph 

10from SPARQLWrapper import JSON 

11 

12from heritrace.extensions import get_form_fields, get_shacl_graph, get_sparql 

13from heritrace.sparql import get_sparql_bindings, select_results 

14from heritrace.utils.display_rules_utils import get_class_priority 

15from heritrace.utils.filters import format_uri_as_readable 

16from heritrace.utils.shacl_display import ( 

17 ShaclProcessingContext, 

18 apply_display_rules, 

19 extract_shacl_form_fields, 

20 order_form_fields, 

21 process_nested_shapes, 

22) 

23from heritrace.utils.virtual_properties import get_virtual_properties_for_entity 

24 

25_class_shapes_cache: WeakKeyDictionary[Graph, dict[str, list[str]]] = ( 

26 WeakKeyDictionary() 

27) 

28_shape_properties_cache: WeakKeyDictionary[Graph, dict[str, set[str]]] = ( 

29 WeakKeyDictionary() 

30) 

31_hasvalue_constraints_cache: WeakKeyDictionary[ 

32 Graph, dict[str, list[tuple[str, str]]] 

33] = WeakKeyDictionary() 

34 

35 

36def get_form_fields_from_shacl( 

37 shacl: Graph | None, display_rules: list[dict] | None, app: Flask 

38) -> dict: 

39 """ 

40 Analyze SHACL shapes to extract form fields for each entity type. 

41 

42 Args: 

43 shacl: The SHACL graph 

44 display_rules: The display rules configuration 

45 app: Flask application instance 

46 

47 Returns: 

48 OrderedDict: A dictionary where the keys are tuples (class, shape) and the 

49 values are dictionaries 

50 of form fields with their properties. 

51 """ 

52 if not shacl: 

53 return {} 

54 

55 # Step 1: Get the initial form fields from SHACL shapes 

56 form_fields = extract_shacl_form_fields(shacl, display_rules, app=app) 

57 

58 # Step 2: Process nested shapes for each field 

59 processed_shapes: set[str] = set() 

60 ctx = ShaclProcessingContext( 

61 shacl=shacl, 

62 display_rules=display_rules, 

63 app=app, 

64 processed_shapes=processed_shapes, 

65 ) 

66 for entity_key in form_fields: 

67 for predicate in form_fields[entity_key]: 

68 for field_info in form_fields[entity_key][predicate]: 

69 if field_info.get("nodeShape"): 

70 field_info["nestedShape"] = process_nested_shapes( 

71 ctx, 

72 str(field_info["nodeShape"]), 

73 ) 

74 

75 # Step 3: Apply display rules to the form fields 

76 if display_rules: 

77 form_fields = apply_display_rules(shacl, form_fields, display_rules) 

78 

79 # Step 3.5: Ensure all form fields have displayName, using fallback for those 

80 # without display rules 

81 ensure_display_names(form_fields) 

82 

83 # Step 4: Add virtual properties to form_fields 

84 enhanced_form_fields = add_virtual_properties_to_form_fields_internal(form_fields) 

85 

86 # Step 5: Order form fields (including virtual properties) 

87 return order_form_fields(enhanced_form_fields, display_rules) 

88 

89 

90def _apply_field_overrides(shape_data: dict, override: dict) -> dict: 

91 nested_field = shape_data.copy() 

92 if "shouldBeDisplayed" in override: 

93 nested_field["shouldBeDisplayed"] = override["shouldBeDisplayed"] 

94 if "displayName" in override: 

95 nested_field["displayName"] = override["displayName"] 

96 if "value" in override: 

97 nested_field["hasValue"] = override["value"] 

98 nested_field["nestedShape"] = [] 

99 return nested_field 

100 

101 

102def _build_nested_shape_entry(vp: dict, enhanced_form_fields: dict) -> list[dict]: 

103 implementation = vp.get("implementedVia", {}) 

104 target = implementation.get("target", {}) 

105 intermediate_class = target.get("class") 

106 specific_shape = target.get("shape") 

107 

108 if not specific_shape and intermediate_class: 

109 specific_shape = determine_shape_for_classes([intermediate_class]) 

110 

111 intermediate_entity_key = find_matching_form_field( 

112 class_uri=intermediate_class, 

113 shape_uri=specific_shape, 

114 form_fields=enhanced_form_fields, 

115 ) 

116 

117 nested_shape_list: list[dict] = [] 

118 if not intermediate_entity_key: 

119 return nested_shape_list 

120 

121 nested_shape_data = enhanced_form_fields.get(intermediate_entity_key, {}) 

122 field_overrides = implementation.get("fieldOverrides", {}) 

123 

124 for nested_prop_uri, nested_details_list in nested_shape_data.items(): 

125 for nested_details in nested_details_list: 

126 if nested_prop_uri in field_overrides: 

127 nested_field = _apply_field_overrides( 

128 nested_details, field_overrides[nested_prop_uri] 

129 ) 

130 else: 

131 nested_field = nested_details.copy() 

132 

133 if nested_field.get("shouldBeDisplayed", True): 

134 nested_shape_list.append(nested_field) 

135 

136 return nested_shape_list 

137 

138 

139def add_virtual_properties_to_form_fields_internal(form_fields: dict) -> dict: 

140 enhanced_form_fields = form_fields.copy() if form_fields else {} 

141 

142 for entity_key in enhanced_form_fields: 

143 entity_class, entity_shape = entity_key 

144 

145 virtual_properties = get_virtual_properties_for_entity( 

146 entity_class, entity_shape 

147 ) 

148 

149 if not virtual_properties: 

150 continue 

151 

152 for display_name, prop_config in virtual_properties: 

153 if not prop_config.get("shouldBeDisplayed", True): 

154 continue 

155 

156 nested_shape_list = _build_nested_shape_entry( 

157 prop_config, enhanced_form_fields 

158 ) 

159 

160 virtual_form_field = { 

161 "displayName": prop_config.get("displayName", display_name), 

162 "uri": display_name, 

163 "is_virtual": True, 

164 "min": 0, 

165 "max": None, 

166 "datatypes": [], 

167 "optionalValues": [], 

168 "orderedBy": None, 

169 "nodeShape": None, 

170 "subjectClass": None, 

171 "subjectShape": None, 

172 "objectClass": None, 

173 "entityType": None, 

174 "nestedShape": nested_shape_list, 

175 "or": None, 

176 } 

177 

178 enhanced_form_fields[entity_key][display_name] = [virtual_form_field] 

179 

180 return enhanced_form_fields 

181 

182 

183def _get_shapes_for_class(shacl_graph: Graph, class_uri: str) -> list[str]: 

184 per_graph = _class_shapes_cache.setdefault(shacl_graph, {}) 

185 if class_uri not in per_graph: 

186 query_string = f""" 

187 SELECT DISTINCT ?shape WHERE {{ 

188 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> . 

189 }} 

190 """ 

191 

192 results = shacl_graph.query(query_string) 

193 per_graph[class_uri] = [str(row.shape) for row in select_results(results)] 

194 return per_graph[class_uri] 

195 

196 

197def determine_shape_for_classes(class_list: list[str]) -> str | None: 

198 """ 

199 Determine the most appropriate SHACL shape for a list of class URIs. 

200 

201 Args: 

202 class_list: List of class URIs to find shapes for 

203 

204 Returns: 

205 The most appropriate shape URI based on priority, or None if no shapes are found 

206 """ 

207 shacl_graph = get_shacl_graph() 

208 if not shacl_graph: 

209 return None 

210 

211 all_shacl_shapes = [] 

212 

213 for class_uri in class_list: 

214 shapes = _get_shapes_for_class(shacl_graph, class_uri) 

215 all_shacl_shapes.extend((class_uri, shape) for shape in shapes) 

216 

217 return _find_highest_priority_shape(all_shacl_shapes) 

218 

219 

220def determine_shape_for_entity_triples(entity_triples: Iterable) -> str | None: 

221 """ 

222 Determine the most appropriate SHACL shape for an entity based on its triples. 

223 

224 Uses a multi-criteria scoring system to distinguish between shapes: 

225 1. sh:hasValue constraint matches (highest priority) 

226 2. Property matching - number of shape properties present in entity 

227 3. Class priority - predefined priority ordering 

228 

229 Args: 

230 entity_triples: List of triples (subject, predicate, object) for the entity 

231 

232 Returns: 

233 The most appropriate shape URI, or None if no shapes are found 

234 """ 

235 shacl_graph = get_shacl_graph() 

236 if not shacl_graph: 

237 return None 

238 

239 entity_classes = [] 

240 entity_properties = set() 

241 

242 for _subject, predicate, obj in entity_triples: 

243 if str(predicate) == str(RDF.type): 

244 entity_classes.append(str(obj)) 

245 entity_properties.add(str(predicate)) 

246 

247 if not entity_classes: 

248 return None 

249 

250 candidate_shapes = [] 

251 

252 for class_uri in entity_classes: 

253 shapes = _get_shapes_for_class(shacl_graph, class_uri) 

254 candidate_shapes.extend((class_uri, shape) for shape in shapes) 

255 

256 if not candidate_shapes: 

257 return None 

258 

259 if len(candidate_shapes) == 1: 

260 return candidate_shapes[0][1] 

261 

262 shape_scores = {} 

263 

264 for class_uri, shape_uri in candidate_shapes: 

265 shape_properties = _get_shape_properties(shacl_graph, shape_uri) 

266 property_matches = len(entity_properties.intersection(shape_properties)) 

267 

268 hasvalue_matches = _check_hasvalue_constraints( 

269 shacl_graph, shape_uri, entity_triples 

270 ) 

271 

272 entity_key = (class_uri, shape_uri) 

273 priority = get_class_priority(entity_key) 

274 

275 # Combined score: (hasvalue_matches, property_matches, -priority) 

276 # hasValue matches are most important, then property matches, then priority 

277 combined_score = (hasvalue_matches, property_matches, -priority) 

278 shape_scores[shape_uri] = combined_score 

279 

280 return max(shape_scores.keys(), key=lambda s: shape_scores[s]) 

281 

282 

283def _find_highest_priority_shape( 

284 class_shape_pairs: list[tuple[str, str]], 

285) -> str | None: 

286 """ 

287 Helper function to find the shape with the highest priority from a list of 

288 (class_uri, shape) pairs. 

289 

290 Args: 

291 class_shape_pairs: List of tuples (class_uri, shape) 

292 

293 Returns: 

294 The shape with the highest priority, or None if the list is empty 

295 """ 

296 highest_priority = float("inf") 

297 highest_priority_shape = None 

298 

299 for class_uri, shape in class_shape_pairs: 

300 entity_key = (class_uri, shape) 

301 priority = get_class_priority(entity_key) 

302 if priority < highest_priority: 

303 highest_priority = priority 

304 highest_priority_shape = shape 

305 

306 return highest_priority_shape 

307 

308 

309def _get_shape_properties(shacl_graph: Graph, shape_uri: str) -> set: 

310 """ 

311 Extract all properties defined in a SHACL shape. 

312 

313 Args: 

314 shacl_graph: The SHACL graph 

315 shape_uri: URI of the shape to analyze 

316 

317 Returns: 

318 Set of property URIs defined in the shape 

319 """ 

320 per_graph = _shape_properties_cache.setdefault(shacl_graph, {}) 

321 if shape_uri not in per_graph: 

322 query_string = f""" 

323 PREFIX sh: <http://www.w3.org/ns/shacl#> 

324 SELECT DISTINCT ?property WHERE {{ 

325 <{shape_uri}> sh:property ?propertyShape . 

326 ?propertyShape sh:path ?property . 

327 }} 

328 """ 

329 

330 results = shacl_graph.query(query_string) 

331 per_graph[shape_uri] = {str(row.property) for row in select_results(results)} 

332 

333 return per_graph[shape_uri] 

334 

335 

336def _get_hasvalue_constraints( 

337 shacl_graph: Graph, shape_uri: str 

338) -> list[tuple[str, str]]: 

339 per_graph = _hasvalue_constraints_cache.setdefault(shacl_graph, {}) 

340 if shape_uri not in per_graph: 

341 query_string = f""" 

342 PREFIX sh: <http://www.w3.org/ns/shacl#> 

343 SELECT DISTINCT ?property ?value WHERE {{ 

344 <{shape_uri}> sh:property ?propertyShape . 

345 ?propertyShape sh:path ?property . 

346 ?propertyShape sh:hasValue ?value . 

347 }} 

348 """ 

349 

350 results = shacl_graph.query(query_string) 

351 per_graph[shape_uri] = [ 

352 (str(row.property), str(row.value)) for row in select_results(results) 

353 ] 

354 

355 return per_graph[shape_uri] 

356 

357 

358def _check_hasvalue_constraints( 

359 shacl_graph: Graph, shape_uri: str, entity_triples: Iterable 

360) -> int: 

361 """ 

362 Check how many sh:hasValue constraints the entity satisfies for a given shape. 

363 

364 Args: 

365 shacl_graph: The SHACL graph 

366 shape_uri: URI of the shape to check 

367 entity_triples: List of triples (subject, predicate, object) for the entity 

368 

369 Returns: 

370 Number of hasValue constraints satisfied by the entity 

371 """ 

372 constraints = _get_hasvalue_constraints(shacl_graph, shape_uri) 

373 

374 if not constraints: 

375 return 0 

376 

377 # Create a set of (predicate, object) pairs from entity triples 

378 entity_property_values = set() 

379 for _, predicate, obj in entity_triples: 

380 entity_property_values.add((str(predicate), str(obj))) 

381 

382 # Count how many constraints are satisfied 

383 satisfied_constraints = 0 

384 for property_uri, required_value in constraints: 

385 if (property_uri, required_value) in entity_property_values: 

386 satisfied_constraints += 1 

387 

388 return satisfied_constraints 

389 

390 

391def ensure_display_names(form_fields: dict) -> None: 

392 """ 

393 Ensures all form fields have a displayName, using URI formatting as fallback. 

394 

395 Args: 

396 form_fields: Dictionary of form fields to process 

397 """ 

398 for predicates in form_fields.values(): 

399 for predicate_uri, details_list in predicates.items(): 

400 for field_info in details_list: 

401 # Only add displayName if not already present 

402 if not field_info.get("displayName"): 

403 field_info["displayName"] = format_uri_as_readable(predicate_uri) 

404 

405 

406def find_matching_form_field( 

407 class_uri: str | None = None, 

408 shape_uri: str | None = None, 

409 form_fields: dict | None = None, 

410) -> tuple[str, str] | None: 

411 """ 

412 Find the most appropriate form field configuration for a given class and/or shape. 

413 At least one of class_uri or shape_uri must be provided. 

414 

415 Args: 

416 class_uri: Optional URI of the class 

417 shape_uri: Optional URI of the shape 

418 form_fields: Optional dictionary of form fields to search in, defaults to global 

419 form_fields 

420 

421 Returns: 

422 The matching form field key (class_uri, shape_uri) or None if no match is found 

423 """ 

424 if not form_fields: 

425 form_fields = get_form_fields() 

426 

427 if not form_fields: 

428 return None 

429 

430 class_match = None 

431 shape_match = None 

432 

433 for field_key in form_fields: 

434 field_class_uri = field_key[0] 

435 field_shape_uri = field_key[1] 

436 

437 # Case 1: Both class and shape match (exact match) 

438 if ( 

439 class_uri 

440 and shape_uri 

441 and field_class_uri == str(class_uri) 

442 and field_shape_uri == str(shape_uri) 

443 ): 

444 return field_key 

445 

446 # Case 2: Only class matches (and form field has no shape constraint) 

447 if class_uri and field_class_uri == str(class_uri) and field_shape_uri is None: 

448 class_match = field_key 

449 

450 # Case 3: Only shape matches (and form field has no class constraint) 

451 elif ( 

452 shape_uri and field_shape_uri == str(shape_uri) and field_class_uri is None 

453 ): 

454 shape_match = field_key 

455 

456 # Case 4: Only class matches (even if form field has a shape) 

457 elif class_uri and field_class_uri == str(class_uri) and not class_match: 

458 class_match = field_key 

459 

460 # Return the best match based on specificity 

461 # Shape rules typically have higher specificity, so prefer them 

462 if shape_match: 

463 return shape_match 

464 if class_match: 

465 return class_match 

466 

467 return None 

468 

469 

470def _find_entity_position_in_order_map(entity_uri: str, order_map: dict) -> int | None: 

471 """ 

472 Helper function to find entity position in an order map. 

473 

474 This function handles the case where there might be multiple independent ordered 

475 chains 

476 within the same predicate relationship. Each chain has its own starting element and 

477 follows a linked-list structure where each entity points to the next one. 

478 

479 Args: 

480 entity_uri: URI of the entity to find position for 

481 order_map: Dictionary mapping entities to their next entity in sequence. 

482 Key = current entity URI, Value = next entity URI (or None for last 

483 element) 

484 Example: {'entity1': 'entity2', 'entity2': 'entity3', 'entity3': 

485 None, 

486 'entity4': 'entity5', 'entity5': None} 

487 This represents two chains: [entity1 -> entity2 -> entity3] and 

488 [entity4 -> entity5] 

489 

490 Returns: 

491 1-based position in the sequence, or None if not found 

492 """ 

493 # Find all starting elements of ordered chains. 

494 # A start element is one that appears as a key in the order_map but never as a 

495 # value, 

496 # meaning no other entity points to it (it's the head of a chain). 

497 start_elements = set(order_map.keys()) - { 

498 v for v in order_map.values() if v is not None 

499 } 

500 

501 if not start_elements: 

502 # No valid starting points found - this shouldn't happen in well-formed data 

503 return None 

504 

505 # Since there can be multiple independent ordered chains, we need to check each one 

506 # to find which chain contains our target entity 

507 for start_element in start_elements: 

508 # Build the complete sequence for this chain by following the linked-list 

509 # structure 

510 sequence = [] 

511 current_element = start_element 

512 

513 # Follow the chain from start to end 

514 while current_element in order_map: 

515 sequence.append(current_element) 

516 # Move to the next element in the chain (or None if we've reached the end) 

517 current_element = order_map[current_element] 

518 

519 # Check if our target entity is in this particular chain 

520 try: 

521 # If found, return its 1-based position within this chain 

522 return ( 

523 sequence.index(entity_uri) + 1 

524 ) # Convert from 0-based to 1-based indexing 

525 except ValueError: 

526 # Entity not found in this chain, try the next one 

527 continue 

528 

529 # Entity was not found in any of the ordered chains 

530 return None 

531 

532 

533def get_entity_position_in_sequence( 

534 entity_uri: str, 

535 subject_uri: str, 

536 predicate_uri: str, 

537 order_property: str, 

538 snapshot: Graph | None = None, 

539) -> int | None: 

540 """ 

541 Get the position of an entity in an ordered sequence. 

542 

543 Args: 

544 entity_uri: URI of the entity to find position for 

545 subject_uri: URI of the subject that has the ordered property 

546 predicate_uri: URI of the ordered predicate 

547 order_property: URI of the property that defines the ordering 

548 snapshot: Optional graph snapshot for historical queries 

549 

550 Returns: 

551 1-based position in the sequence, or None if not found 

552 """ 

553 order_query = f""" 

554 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue) 

555 WHERE {{ 

556 <{subject_uri}> <{predicate_uri}> ?orderedEntity. 

557 OPTIONAL {{ 

558 ?orderedEntity <{order_property}> ?next. 

559 }} 

560 }} 

561 """ 

562 

563 if snapshot: 

564 order_results = list(select_results(snapshot.query(order_query))) 

565 

566 order_map = {} 

567 for res in order_results: 

568 ordered_entity = str(res[0]) 

569 next_value = str(res[1]) 

570 order_map[ordered_entity] = None if next_value == "NONE" else next_value 

571 

572 return _find_entity_position_in_order_map(entity_uri, order_map) 

573 sparql = get_sparql() 

574 sparql.setQuery(order_query) 

575 sparql.setReturnFormat(JSON) 

576 order_results = get_sparql_bindings(sparql.query().convert()) 

577 

578 order_map = {} 

579 for res in order_results: 

580 ordered_entity = res["orderedEntity"]["value"] 

581 next_value = res["nextValue"]["value"] 

582 order_map[ordered_entity] = None if next_value == "NONE" else next_value 

583 

584 return _find_entity_position_in_order_map(entity_uri, order_map)