Coverage for heritrace / utils / shacl_utils.py: 96%

206 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-21 12:56 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from typing import List, Optional, Tuple 

6 

7from flask import Flask 

8from heritrace.extensions import get_shacl_graph, get_sparql 

9from heritrace.utils.display_rules_utils import get_class_priority 

10from heritrace.utils.shacl_display import (apply_display_rules, 

11 extract_shacl_form_fields, 

12 order_form_fields, 

13 process_nested_shapes) 

14from rdflib import RDF, Graph 

15from SPARQLWrapper import JSON 

16 

17 

18def get_form_fields_from_shacl(shacl: Graph, display_rules: List[dict], app: Flask): 

19 """ 

20 Analyze SHACL shapes to extract form fields for each entity type. 

21  

22 Args: 

23 shacl: The SHACL graph 

24 display_rules: The display rules configuration 

25 app: Flask application instance 

26 

27 Returns: 

28 OrderedDict: A dictionary where the keys are tuples (class, shape) and the values are dictionaries 

29 of form fields with their properties. 

30 """ 

31 if not shacl: 

32 return dict() 

33 

34 # Step 1: Get the initial form fields from SHACL shapes 

35 form_fields = extract_shacl_form_fields(shacl, display_rules, app=app) 

36 

37 # Step 2: Process nested shapes for each field 

38 processed_shapes = set() 

39 for entity_key in form_fields: 

40 for predicate in form_fields[entity_key]: 

41 for field_info in form_fields[entity_key][predicate]: 

42 if field_info.get("nodeShape"): 

43 field_info["nestedShape"] = process_nested_shapes( 

44 shacl, 

45 display_rules, 

46 field_info["nodeShape"], 

47 app=app, 

48 processed_shapes=processed_shapes, 

49 ) 

50 

51 # Step 3: Apply display rules to the form fields 

52 if display_rules: 

53 form_fields = apply_display_rules(shacl, form_fields, display_rules) 

54 

55 # Step 3.5: Ensure all form fields have displayName, using fallback for those without display rules 

56 ensure_display_names(form_fields) 

57 

58 # Step 4: Add virtual properties to form_fields 

59 enhanced_form_fields = add_virtual_properties_to_form_fields_internal(form_fields) 

60 

61 # Step 5: Order form fields (including virtual properties) 

62 ordered_form_fields = order_form_fields(enhanced_form_fields, display_rules) 

63 

64 return ordered_form_fields 

65 

66 

67def add_virtual_properties_to_form_fields_internal(form_fields: dict) -> dict: 

68 """ 

69 Add virtual properties to form_fields during initial processing. 

70 

71 Args: 

72 form_fields: The original form_fields dictionary 

73 

74 Returns: 

75 Enhanced form_fields dictionary with virtual properties included 

76 """ 

77 from heritrace.utils.virtual_properties import get_virtual_properties_for_entity 

78 

79 enhanced_form_fields = form_fields.copy() if form_fields else {} 

80 

81 for entity_key in enhanced_form_fields.keys(): 

82 entity_class, entity_shape = entity_key 

83 

84 virtual_properties = get_virtual_properties_for_entity(entity_class, entity_shape) 

85 

86 if virtual_properties: 

87 for display_name, prop_config in virtual_properties: 

88 should_be_displayed = prop_config.get("shouldBeDisplayed", True) 

89 if not should_be_displayed: 

90 continue 

91 

92 implementation = prop_config.get("implementedVia", {}) 

93 target = implementation.get("target", {}) 

94 intermediate_class = target.get("class") 

95 specific_shape = target.get("shape") 

96 

97 if not specific_shape and intermediate_class: 

98 specific_shape = determine_shape_for_classes([intermediate_class]) 

99 

100 intermediate_entity_key = find_matching_form_field( 

101 class_uri=intermediate_class, 

102 shape_uri=specific_shape, 

103 form_fields=enhanced_form_fields 

104 ) 

105 

106 nested_shape_list = [] 

107 if intermediate_entity_key: 

108 nested_shape_data = enhanced_form_fields.get(intermediate_entity_key, {}) 

109 field_overrides = implementation.get("fieldOverrides", {}) 

110 

111 for nested_prop_uri, nested_details_list in nested_shape_data.items(): 

112 for nested_details in nested_details_list: 

113 nested_field = nested_details.copy() 

114 

115 if nested_prop_uri in field_overrides: 

116 override = field_overrides[nested_prop_uri] 

117 if "shouldBeDisplayed" in override: 

118 nested_field["shouldBeDisplayed"] = override["shouldBeDisplayed"] 

119 if "displayName" in override: 

120 nested_field["displayName"] = override["displayName"] 

121 if "value" in override: 

122 nested_field["hasValue"] = override["value"] 

123 nested_field["nestedShape"] = [] 

124 

125 if nested_field.get('shouldBeDisplayed', True): 

126 nested_shape_list.append(nested_field) 

127 

128 virtual_form_field = { 

129 "displayName": prop_config.get("displayName", display_name), 

130 "uri": display_name, 

131 "is_virtual": True, 

132 "min": 0, 

133 "max": None, 

134 "datatypes": [], 

135 "optionalValues": [], 

136 "orderedBy": None, 

137 "nodeShape": None, 

138 "subjectClass": None, 

139 "subjectShape": None, 

140 "objectClass": None, 

141 "entityType": None, 

142 "nestedShape": nested_shape_list, 

143 "or": None 

144 } 

145 

146 enhanced_form_fields[entity_key][display_name] = [virtual_form_field] 

147 

148 return enhanced_form_fields 

149 

150 

151def determine_shape_for_classes(class_list: List[str]) -> Optional[str]: 

152 """ 

153 Determine the most appropriate SHACL shape for a list of class URIs. 

154  

155 Args: 

156 class_list: List of class URIs to find shapes for 

157  

158 Returns: 

159 The most appropriate shape URI based on priority, or None if no shapes are found 

160 """ 

161 shacl_graph = get_shacl_graph() 

162 if not shacl_graph: 

163 return None 

164 

165 all_shacl_shapes = [] 

166 

167 for class_uri in class_list: 

168 query_string = f""" 

169 SELECT DISTINCT ?shape WHERE {{ 

170 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> . 

171 }} 

172 """ 

173 

174 results = shacl_graph.query(query_string) 

175 shapes = [str(row.shape) for row in results] 

176 

177 for shape in shapes: 

178 all_shacl_shapes.append((class_uri, shape)) 

179 

180 return _find_highest_priority_shape(all_shacl_shapes) 

181 

182 

183def determine_shape_for_entity_triples(entity_triples: list) -> Optional[str]: 

184 """ 

185 Determine the most appropriate SHACL shape for an entity based on its triples. 

186  

187 Uses a multi-criteria scoring system to distinguish between shapes: 

188 1. sh:hasValue constraint matches (highest priority) 

189 2. Property matching - number of shape properties present in entity 

190 3. Class priority - predefined priority ordering 

191  

192 Args: 

193 entity_triples: List of triples (subject, predicate, object) for the entity 

194  

195 Returns: 

196 The most appropriate shape URI, or None if no shapes are found 

197 """ 

198 shacl_graph = get_shacl_graph() 

199 if not shacl_graph: 

200 return None 

201 

202 entity_classes = [] 

203 entity_properties = set() 

204 

205 for subject, predicate, obj in entity_triples: 

206 if str(predicate) == str(RDF.type): 

207 entity_classes.append(str(obj)) 

208 entity_properties.add(str(predicate)) 

209 

210 if not entity_classes: 

211 return None 

212 

213 candidate_shapes = [] 

214 

215 for class_uri in entity_classes: 

216 query_string = f""" 

217 SELECT DISTINCT ?shape WHERE {{ 

218 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> . 

219 }} 

220 """ 

221 

222 results = shacl_graph.query(query_string) 

223 shapes = [str(row.shape) for row in results] 

224 

225 for shape in shapes: 

226 candidate_shapes.append((class_uri, shape)) 

227 

228 if not candidate_shapes: 

229 return None 

230 

231 if len(candidate_shapes) == 1: 

232 return candidate_shapes[0][1] 

233 

234 shape_scores = {} 

235 

236 for class_uri, shape_uri in candidate_shapes: 

237 shape_properties = _get_shape_properties(shacl_graph, shape_uri) 

238 property_matches = len(entity_properties.intersection(shape_properties)) 

239 

240 hasvalue_matches = _check_hasvalue_constraints(shacl_graph, shape_uri, entity_triples) 

241 

242 entity_key = (class_uri, shape_uri) 

243 priority = get_class_priority(entity_key) 

244 

245 # Combined score: (hasvalue_matches, property_matches, -priority) 

246 # hasValue matches are most important, then property matches, then priority 

247 combined_score = (hasvalue_matches, property_matches, -priority) 

248 shape_scores[shape_uri] = combined_score 

249 

250 best_shape = max(shape_scores.keys(), key=lambda s: shape_scores[s]) 

251 return best_shape 

252 

253 

254def _find_highest_priority_shape(class_shape_pairs: List[Tuple[str, str]]) -> Optional[str]: 

255 """ 

256 Helper function to find the shape with the highest priority from a list of (class_uri, shape) pairs. 

257  

258 Args: 

259 class_shape_pairs: List of tuples (class_uri, shape) 

260  

261 Returns: 

262 The shape with the highest priority, or None if the list is empty 

263 """ 

264 highest_priority = float('inf') 

265 highest_priority_shape = None 

266 

267 for class_uri, shape in class_shape_pairs: 

268 entity_key = (class_uri, shape) 

269 priority = get_class_priority(entity_key) 

270 if priority < highest_priority: 

271 highest_priority = priority 

272 highest_priority_shape = shape 

273 

274 return highest_priority_shape 

275 

276 

277def _get_shape_properties(shacl_graph: Graph, shape_uri: str) -> set: 

278 """ 

279 Extract all properties defined in a SHACL shape. 

280  

281 Args: 

282 shacl_graph: The SHACL graph 

283 shape_uri: URI of the shape to analyze 

284  

285 Returns: 

286 Set of property URIs defined in the shape 

287 """ 

288 properties = set() 

289 

290 query_string = f""" 

291 PREFIX sh: <http://www.w3.org/ns/shacl#> 

292 SELECT DISTINCT ?property WHERE {{ 

293 <{shape_uri}> sh:property ?propertyShape . 

294 ?propertyShape sh:path ?property . 

295 }} 

296 """ 

297 

298 results = shacl_graph.query(query_string) 

299 for row in results: 

300 properties.add(str(row.property)) 

301 

302 return properties 

303 

304 

305def _check_hasvalue_constraints(shacl_graph: Graph, shape_uri: str, entity_triples: list) -> int: 

306 """ 

307 Check how many sh:hasValue constraints the entity satisfies for a given shape. 

308  

309 Args: 

310 shacl_graph: The SHACL graph 

311 shape_uri: URI of the shape to check 

312 entity_triples: List of triples (subject, predicate, object) for the entity 

313  

314 Returns: 

315 Number of hasValue constraints satisfied by the entity 

316 """ 

317 # Get all hasValue constraints for this shape 

318 query_string = f""" 

319 PREFIX sh: <http://www.w3.org/ns/shacl#> 

320 SELECT DISTINCT ?property ?value WHERE {{ 

321 <{shape_uri}> sh:property ?propertyShape . 

322 ?propertyShape sh:path ?property . 

323 ?propertyShape sh:hasValue ?value . 

324 }} 

325 """ 

326 

327 results = shacl_graph.query(query_string) 

328 constraints = [(str(row.property), str(row.value)) for row in results] 

329 

330 if not constraints: 

331 return 0 

332 

333 # Create a set of (predicate, object) pairs from entity triples 

334 entity_property_values = set() 

335 for _, predicate, obj in entity_triples: 

336 entity_property_values.add((str(predicate), str(obj))) 

337 

338 # Count how many constraints are satisfied 

339 satisfied_constraints = 0 

340 for property_uri, required_value in constraints: 

341 if (property_uri, required_value) in entity_property_values: 

342 satisfied_constraints += 1 

343 

344 return satisfied_constraints 

345 

346 

347def ensure_display_names(form_fields): 

348 """ 

349 Ensures all form fields have a displayName, using URI formatting as fallback. 

350  

351 Args: 

352 form_fields: Dictionary of form fields to process 

353 """ 

354 from heritrace.utils.filters import format_uri_as_readable 

355 

356 for entity_key, predicates in form_fields.items(): 

357 for predicate_uri, details_list in predicates.items(): 

358 for field_info in details_list: 

359 # Only add displayName if not already present 

360 if not field_info.get("displayName"): 

361 field_info["displayName"] = format_uri_as_readable(predicate_uri) 

362 

363 

364def find_matching_form_field(class_uri=None, shape_uri=None, form_fields=None): 

365 """ 

366 Find the most appropriate form field configuration for a given class and/or shape. 

367 At least one of class_uri or shape_uri must be provided. 

368  

369 Args: 

370 class_uri: Optional URI of the class 

371 shape_uri: Optional URI of the shape 

372 form_fields: Optional dictionary of form fields to search in, defaults to global form_fields 

373  

374 Returns: 

375 The matching form field key (class_uri, shape_uri) or None if no match is found 

376 """ 

377 if not form_fields: 

378 from heritrace.extensions import get_form_fields 

379 form_fields = get_form_fields() 

380 

381 if not form_fields: 

382 return None 

383 

384 class_match = None 

385 shape_match = None 

386 

387 for field_key in form_fields.keys(): 

388 field_class_uri = field_key[0] 

389 field_shape_uri = field_key[1] 

390 

391 # Case 1: Both class and shape match (exact match) 

392 if class_uri and shape_uri and \ 

393 field_class_uri == str(class_uri) and \ 

394 field_shape_uri == str(shape_uri): 

395 return field_key 

396 

397 # Case 2: Only class matches (and form field has no shape constraint) 

398 elif class_uri and field_class_uri == str(class_uri) and field_shape_uri is None: 

399 class_match = field_key 

400 

401 # Case 3: Only shape matches (and form field has no class constraint) 

402 elif shape_uri and field_shape_uri == str(shape_uri) and field_class_uri is None: 

403 shape_match = field_key 

404 

405 # Case 4: Only class matches (even if form field has a shape) 

406 elif class_uri and field_class_uri == str(class_uri) and not class_match: 

407 class_match = field_key 

408 

409 # Return the best match based on specificity 

410 # Shape rules typically have higher specificity, so prefer them 

411 if shape_match: 

412 return shape_match 

413 elif class_match: 

414 return class_match 

415 

416 return None 

417 

418 

419def _find_entity_position_in_order_map(entity_uri: str, order_map: dict) -> Optional[int]: 

420 """ 

421 Helper function to find entity position in an order map. 

422  

423 This function handles the case where there might be multiple independent ordered chains 

424 within the same predicate relationship. Each chain has its own starting element and 

425 follows a linked-list structure where each entity points to the next one. 

426  

427 Args: 

428 entity_uri: URI of the entity to find position for 

429 order_map: Dictionary mapping entities to their next entity in sequence. 

430 Key = current entity URI, Value = next entity URI (or None for last element) 

431 Example: {'entity1': 'entity2', 'entity2': 'entity3', 'entity3': None, 

432 'entity4': 'entity5', 'entity5': None} 

433 This represents two chains: [entity1 -> entity2 -> entity3] and [entity4 -> entity5] 

434  

435 Returns: 

436 1-based position in the sequence, or None if not found 

437 """ 

438 # Find all starting elements of ordered chains. 

439 # A start element is one that appears as a key in the order_map but never as a value, 

440 # meaning no other entity points to it (it's the head of a chain). 

441 start_elements = set(order_map.keys()) - set(v for v in order_map.values() if v is not None) 

442 

443 if not start_elements: 

444 # No valid starting points found - this shouldn't happen in well-formed data 

445 return None 

446 

447 # Since there can be multiple independent ordered chains, we need to check each one 

448 # to find which chain contains our target entity 

449 for start_element in start_elements: 

450 # Build the complete sequence for this chain by following the linked-list structure 

451 sequence = [] 

452 current_element = start_element 

453 

454 # Follow the chain from start to end 

455 while current_element in order_map: 

456 sequence.append(current_element) 

457 # Move to the next element in the chain (or None if we've reached the end) 

458 current_element = order_map[current_element] 

459 

460 # Check if our target entity is in this particular chain 

461 try: 

462 # If found, return its 1-based position within this chain 

463 return sequence.index(entity_uri) + 1 # Convert from 0-based to 1-based indexing 

464 except ValueError: 

465 # Entity not found in this chain, try the next one 

466 continue 

467 

468 # Entity was not found in any of the ordered chains 

469 return None 

470 

471 

472def get_entity_position_in_sequence(entity_uri: str, subject_uri: str, predicate_uri: str, 

473 order_property: str, snapshot: Optional[Graph] = None) -> Optional[int]: 

474 """ 

475 Get the position of an entity in an ordered sequence. 

476  

477 Args: 

478 entity_uri: URI of the entity to find position for 

479 subject_uri: URI of the subject that has the ordered property 

480 predicate_uri: URI of the ordered predicate 

481 order_property: URI of the property that defines the ordering 

482 snapshot: Optional graph snapshot for historical queries 

483  

484 Returns: 

485 1-based position in the sequence, or None if not found 

486 """ 

487 order_query = f""" 

488 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue) 

489 WHERE {{ 

490 <{subject_uri}> <{predicate_uri}> ?orderedEntity. 

491 OPTIONAL {{ 

492 ?orderedEntity <{order_property}> ?next. 

493 }} 

494 }} 

495 """ 

496 

497 if snapshot: 

498 order_results = list(snapshot.query(order_query)) 

499 

500 order_map = {} 

501 for res in order_results: 

502 ordered_entity = str(res[0]) 

503 next_value = str(res[1]) 

504 order_map[ordered_entity] = None if next_value == "NONE" else next_value 

505 

506 position = _find_entity_position_in_order_map(entity_uri, order_map) 

507 return position 

508 else: 

509 sparql = get_sparql() 

510 sparql.setQuery(order_query) 

511 sparql.setReturnFormat(JSON) 

512 order_results = sparql.query().convert().get("results", {}).get("bindings", []) 

513 

514 order_map = {} 

515 for res in order_results: 

516 ordered_entity = res["orderedEntity"]["value"] 

517 next_value = res["nextValue"]["value"] 

518 order_map[ordered_entity] = None if next_value == "NONE" else next_value 

519 

520 return _find_entity_position_in_order_map(entity_uri, order_map)