Coverage for heritrace/utils/shacl_utils.py: 96%

206 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-10-13 17:12 +0000

1from typing import List, Optional, Tuple 

2 

3from flask import Flask 

4from heritrace.extensions import get_shacl_graph, get_sparql 

5from heritrace.utils.display_rules_utils import get_class_priority 

6from heritrace.utils.shacl_display import (apply_display_rules, 

7 extract_shacl_form_fields, 

8 order_form_fields, 

9 process_nested_shapes) 

10from rdflib import RDF, Graph 

11from SPARQLWrapper import JSON 

12 

13 

14def get_form_fields_from_shacl(shacl: Graph, display_rules: List[dict], app: Flask): 

15 """ 

16 Analyze SHACL shapes to extract form fields for each entity type. 

17  

18 Args: 

19 shacl: The SHACL graph 

20 display_rules: The display rules configuration 

21 app: Flask application instance 

22 

23 Returns: 

24 OrderedDict: A dictionary where the keys are tuples (class, shape) and the values are dictionaries 

25 of form fields with their properties. 

26 """ 

27 if not shacl: 

28 return dict() 

29 

30 # Step 1: Get the initial form fields from SHACL shapes 

31 form_fields = extract_shacl_form_fields(shacl, display_rules, app=app) 

32 

33 # Step 2: Process nested shapes for each field 

34 processed_shapes = set() 

35 for entity_key in form_fields: 

36 for predicate in form_fields[entity_key]: 

37 for field_info in form_fields[entity_key][predicate]: 

38 if field_info.get("nodeShape"): 

39 field_info["nestedShape"] = process_nested_shapes( 

40 shacl, 

41 display_rules, 

42 field_info["nodeShape"], 

43 app=app, 

44 processed_shapes=processed_shapes, 

45 ) 

46 

47 # Step 3: Apply display rules to the form fields 

48 if display_rules: 

49 form_fields = apply_display_rules(shacl, form_fields, display_rules) 

50 

51 # Step 3.5: Ensure all form fields have displayName, using fallback for those without display rules 

52 ensure_display_names(form_fields) 

53 

54 # Step 4: Add virtual properties to form_fields 

55 enhanced_form_fields = add_virtual_properties_to_form_fields_internal(form_fields) 

56 

57 # Step 5: Order form fields (including virtual properties) 

58 ordered_form_fields = order_form_fields(enhanced_form_fields, display_rules) 

59 

60 return ordered_form_fields 

61 

62 

63def add_virtual_properties_to_form_fields_internal(form_fields: dict) -> dict: 

64 """ 

65 Add virtual properties to form_fields during initial processing. 

66 

67 Args: 

68 form_fields: The original form_fields dictionary 

69 

70 Returns: 

71 Enhanced form_fields dictionary with virtual properties included 

72 """ 

73 from heritrace.utils.virtual_properties import get_virtual_properties_for_entity 

74 

75 enhanced_form_fields = form_fields.copy() if form_fields else {} 

76 

77 for entity_key in enhanced_form_fields.keys(): 

78 entity_class, entity_shape = entity_key 

79 

80 virtual_properties = get_virtual_properties_for_entity(entity_class, entity_shape) 

81 

82 if virtual_properties: 

83 for display_name, prop_config in virtual_properties: 

84 should_be_displayed = prop_config.get("shouldBeDisplayed", True) 

85 if not should_be_displayed: 

86 continue 

87 

88 implementation = prop_config.get("implementedVia", {}) 

89 target = implementation.get("target", {}) 

90 intermediate_class = target.get("class") 

91 specific_shape = target.get("shape") 

92 

93 if not specific_shape and intermediate_class: 

94 specific_shape = determine_shape_for_classes([intermediate_class]) 

95 

96 intermediate_entity_key = find_matching_form_field( 

97 class_uri=intermediate_class, 

98 shape_uri=specific_shape, 

99 form_fields=enhanced_form_fields 

100 ) 

101 

102 nested_shape_list = [] 

103 if intermediate_entity_key: 

104 nested_shape_data = enhanced_form_fields.get(intermediate_entity_key, {}) 

105 field_overrides = implementation.get("fieldOverrides", {}) 

106 

107 for nested_prop_uri, nested_details_list in nested_shape_data.items(): 

108 for nested_details in nested_details_list: 

109 nested_field = nested_details.copy() 

110 

111 if nested_prop_uri in field_overrides: 

112 override = field_overrides[nested_prop_uri] 

113 if "shouldBeDisplayed" in override: 

114 nested_field["shouldBeDisplayed"] = override["shouldBeDisplayed"] 

115 if "displayName" in override: 

116 nested_field["displayName"] = override["displayName"] 

117 if "value" in override: 

118 nested_field["hasValue"] = override["value"] 

119 nested_field["nestedShape"] = [] 

120 

121 if nested_field.get('shouldBeDisplayed', True): 

122 nested_shape_list.append(nested_field) 

123 

124 virtual_form_field = { 

125 "displayName": prop_config.get("displayName", display_name), 

126 "uri": display_name, 

127 "is_virtual": True, 

128 "min": 0, 

129 "max": None, 

130 "datatypes": [], 

131 "optionalValues": [], 

132 "orderedBy": None, 

133 "nodeShape": None, 

134 "subjectClass": None, 

135 "subjectShape": None, 

136 "objectClass": None, 

137 "entityType": None, 

138 "nestedShape": nested_shape_list, 

139 "or": None 

140 } 

141 

142 enhanced_form_fields[entity_key][display_name] = [virtual_form_field] 

143 

144 return enhanced_form_fields 

145 

146 

147def determine_shape_for_classes(class_list: List[str]) -> Optional[str]: 

148 """ 

149 Determine the most appropriate SHACL shape for a list of class URIs. 

150  

151 Args: 

152 class_list: List of class URIs to find shapes for 

153  

154 Returns: 

155 The most appropriate shape URI based on priority, or None if no shapes are found 

156 """ 

157 shacl_graph = get_shacl_graph() 

158 if not shacl_graph: 

159 return None 

160 

161 all_shacl_shapes = [] 

162 

163 for class_uri in class_list: 

164 query_string = f""" 

165 SELECT DISTINCT ?shape WHERE {{ 

166 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> . 

167 }} 

168 """ 

169 

170 results = shacl_graph.query(query_string) 

171 shapes = [str(row.shape) for row in results] 

172 

173 for shape in shapes: 

174 all_shacl_shapes.append((class_uri, shape)) 

175 

176 return _find_highest_priority_shape(all_shacl_shapes) 

177 

178 

179def determine_shape_for_entity_triples(entity_triples: list) -> Optional[str]: 

180 """ 

181 Determine the most appropriate SHACL shape for an entity based on its triples. 

182  

183 Uses a multi-criteria scoring system to distinguish between shapes: 

184 1. sh:hasValue constraint matches (highest priority) 

185 2. Property matching - number of shape properties present in entity 

186 3. Class priority - predefined priority ordering 

187  

188 Args: 

189 entity_triples: List of triples (subject, predicate, object) for the entity 

190  

191 Returns: 

192 The most appropriate shape URI, or None if no shapes are found 

193 """ 

194 shacl_graph = get_shacl_graph() 

195 if not shacl_graph: 

196 return None 

197 

198 entity_classes = [] 

199 entity_properties = set() 

200 

201 for subject, predicate, obj in entity_triples: 

202 if str(predicate) == str(RDF.type): 

203 entity_classes.append(str(obj)) 

204 entity_properties.add(str(predicate)) 

205 

206 if not entity_classes: 

207 return None 

208 

209 candidate_shapes = [] 

210 

211 for class_uri in entity_classes: 

212 query_string = f""" 

213 SELECT DISTINCT ?shape WHERE {{ 

214 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> . 

215 }} 

216 """ 

217 

218 results = shacl_graph.query(query_string) 

219 shapes = [str(row.shape) for row in results] 

220 

221 for shape in shapes: 

222 candidate_shapes.append((class_uri, shape)) 

223 

224 if not candidate_shapes: 

225 return None 

226 

227 if len(candidate_shapes) == 1: 

228 return candidate_shapes[0][1] 

229 

230 shape_scores = {} 

231 

232 for class_uri, shape_uri in candidate_shapes: 

233 shape_properties = _get_shape_properties(shacl_graph, shape_uri) 

234 property_matches = len(entity_properties.intersection(shape_properties)) 

235 

236 hasvalue_matches = _check_hasvalue_constraints(shacl_graph, shape_uri, entity_triples) 

237 

238 entity_key = (class_uri, shape_uri) 

239 priority = get_class_priority(entity_key) 

240 

241 # Combined score: (hasvalue_matches, property_matches, -priority) 

242 # hasValue matches are most important, then property matches, then priority 

243 combined_score = (hasvalue_matches, property_matches, -priority) 

244 shape_scores[shape_uri] = combined_score 

245 

246 best_shape = max(shape_scores.keys(), key=lambda s: shape_scores[s]) 

247 return best_shape 

248 

249 

250def _find_highest_priority_shape(class_shape_pairs: List[Tuple[str, str]]) -> Optional[str]: 

251 """ 

252 Helper function to find the shape with the highest priority from a list of (class_uri, shape) pairs. 

253  

254 Args: 

255 class_shape_pairs: List of tuples (class_uri, shape) 

256  

257 Returns: 

258 The shape with the highest priority, or None if the list is empty 

259 """ 

260 highest_priority = float('inf') 

261 highest_priority_shape = None 

262 

263 for class_uri, shape in class_shape_pairs: 

264 entity_key = (class_uri, shape) 

265 priority = get_class_priority(entity_key) 

266 if priority < highest_priority: 

267 highest_priority = priority 

268 highest_priority_shape = shape 

269 

270 return highest_priority_shape 

271 

272 

273def _get_shape_properties(shacl_graph: Graph, shape_uri: str) -> set: 

274 """ 

275 Extract all properties defined in a SHACL shape. 

276  

277 Args: 

278 shacl_graph: The SHACL graph 

279 shape_uri: URI of the shape to analyze 

280  

281 Returns: 

282 Set of property URIs defined in the shape 

283 """ 

284 properties = set() 

285 

286 query_string = f""" 

287 PREFIX sh: <http://www.w3.org/ns/shacl#> 

288 SELECT DISTINCT ?property WHERE {{ 

289 <{shape_uri}> sh:property ?propertyShape . 

290 ?propertyShape sh:path ?property . 

291 }} 

292 """ 

293 

294 results = shacl_graph.query(query_string) 

295 for row in results: 

296 properties.add(str(row.property)) 

297 

298 return properties 

299 

300 

301def _check_hasvalue_constraints(shacl_graph: Graph, shape_uri: str, entity_triples: list) -> int: 

302 """ 

303 Check how many sh:hasValue constraints the entity satisfies for a given shape. 

304  

305 Args: 

306 shacl_graph: The SHACL graph 

307 shape_uri: URI of the shape to check 

308 entity_triples: List of triples (subject, predicate, object) for the entity 

309  

310 Returns: 

311 Number of hasValue constraints satisfied by the entity 

312 """ 

313 # Get all hasValue constraints for this shape 

314 query_string = f""" 

315 PREFIX sh: <http://www.w3.org/ns/shacl#> 

316 SELECT DISTINCT ?property ?value WHERE {{ 

317 <{shape_uri}> sh:property ?propertyShape . 

318 ?propertyShape sh:path ?property . 

319 ?propertyShape sh:hasValue ?value . 

320 }} 

321 """ 

322 

323 results = shacl_graph.query(query_string) 

324 constraints = [(str(row.property), str(row.value)) for row in results] 

325 

326 if not constraints: 

327 return 0 

328 

329 # Create a set of (predicate, object) pairs from entity triples 

330 entity_property_values = set() 

331 for _, predicate, obj in entity_triples: 

332 entity_property_values.add((str(predicate), str(obj))) 

333 

334 # Count how many constraints are satisfied 

335 satisfied_constraints = 0 

336 for property_uri, required_value in constraints: 

337 if (property_uri, required_value) in entity_property_values: 

338 satisfied_constraints += 1 

339 

340 return satisfied_constraints 

341 

342 

343def ensure_display_names(form_fields): 

344 """ 

345 Ensures all form fields have a displayName, using URI formatting as fallback. 

346  

347 Args: 

348 form_fields: Dictionary of form fields to process 

349 """ 

350 from heritrace.utils.filters import format_uri_as_readable 

351 

352 for entity_key, predicates in form_fields.items(): 

353 for predicate_uri, details_list in predicates.items(): 

354 for field_info in details_list: 

355 # Only add displayName if not already present 

356 if not field_info.get("displayName"): 

357 field_info["displayName"] = format_uri_as_readable(predicate_uri) 

358 

359 

360def find_matching_form_field(class_uri=None, shape_uri=None, form_fields=None): 

361 """ 

362 Find the most appropriate form field configuration for a given class and/or shape. 

363 At least one of class_uri or shape_uri must be provided. 

364  

365 Args: 

366 class_uri: Optional URI of the class 

367 shape_uri: Optional URI of the shape 

368 form_fields: Optional dictionary of form fields to search in, defaults to global form_fields 

369  

370 Returns: 

371 The matching form field key (class_uri, shape_uri) or None if no match is found 

372 """ 

373 if not form_fields: 

374 from heritrace.extensions import get_form_fields 

375 form_fields = get_form_fields() 

376 

377 if not form_fields: 

378 return None 

379 

380 class_match = None 

381 shape_match = None 

382 

383 for field_key in form_fields.keys(): 

384 field_class_uri = field_key[0] 

385 field_shape_uri = field_key[1] 

386 

387 # Case 1: Both class and shape match (exact match) 

388 if class_uri and shape_uri and \ 

389 field_class_uri == str(class_uri) and \ 

390 field_shape_uri == str(shape_uri): 

391 return field_key 

392 

393 # Case 2: Only class matches (and form field has no shape constraint) 

394 elif class_uri and field_class_uri == str(class_uri) and field_shape_uri is None: 

395 class_match = field_key 

396 

397 # Case 3: Only shape matches (and form field has no class constraint) 

398 elif shape_uri and field_shape_uri == str(shape_uri) and field_class_uri is None: 

399 shape_match = field_key 

400 

401 # Case 4: Only class matches (even if form field has a shape) 

402 elif class_uri and field_class_uri == str(class_uri) and not class_match: 

403 class_match = field_key 

404 

405 # Return the best match based on specificity 

406 # Shape rules typically have higher specificity, so prefer them 

407 if shape_match: 

408 return shape_match 

409 elif class_match: 

410 return class_match 

411 

412 return None 

413 

414 

415def _find_entity_position_in_order_map(entity_uri: str, order_map: dict) -> Optional[int]: 

416 """ 

417 Helper function to find entity position in an order map. 

418  

419 This function handles the case where there might be multiple independent ordered chains 

420 within the same predicate relationship. Each chain has its own starting element and 

421 follows a linked-list structure where each entity points to the next one. 

422  

423 Args: 

424 entity_uri: URI of the entity to find position for 

425 order_map: Dictionary mapping entities to their next entity in sequence. 

426 Key = current entity URI, Value = next entity URI (or None for last element) 

427 Example: {'entity1': 'entity2', 'entity2': 'entity3', 'entity3': None, 

428 'entity4': 'entity5', 'entity5': None} 

429 This represents two chains: [entity1 -> entity2 -> entity3] and [entity4 -> entity5] 

430  

431 Returns: 

432 1-based position in the sequence, or None if not found 

433 """ 

434 # Find all starting elements of ordered chains. 

435 # A start element is one that appears as a key in the order_map but never as a value, 

436 # meaning no other entity points to it (it's the head of a chain). 

437 start_elements = set(order_map.keys()) - set(v for v in order_map.values() if v is not None) 

438 

439 if not start_elements: 

440 # No valid starting points found - this shouldn't happen in well-formed data 

441 return None 

442 

443 # Since there can be multiple independent ordered chains, we need to check each one 

444 # to find which chain contains our target entity 

445 for start_element in start_elements: 

446 # Build the complete sequence for this chain by following the linked-list structure 

447 sequence = [] 

448 current_element = start_element 

449 

450 # Follow the chain from start to end 

451 while current_element in order_map: 

452 sequence.append(current_element) 

453 # Move to the next element in the chain (or None if we've reached the end) 

454 current_element = order_map[current_element] 

455 

456 # Check if our target entity is in this particular chain 

457 try: 

458 # If found, return its 1-based position within this chain 

459 return sequence.index(entity_uri) + 1 # Convert from 0-based to 1-based indexing 

460 except ValueError: 

461 # Entity not found in this chain, try the next one 

462 continue 

463 

464 # Entity was not found in any of the ordered chains 

465 return None 

466 

467 

468def get_entity_position_in_sequence(entity_uri: str, subject_uri: str, predicate_uri: str, 

469 order_property: str, snapshot: Optional[Graph] = None) -> Optional[int]: 

470 """ 

471 Get the position of an entity in an ordered sequence. 

472  

473 Args: 

474 entity_uri: URI of the entity to find position for 

475 subject_uri: URI of the subject that has the ordered property 

476 predicate_uri: URI of the ordered predicate 

477 order_property: URI of the property that defines the ordering 

478 snapshot: Optional graph snapshot for historical queries 

479  

480 Returns: 

481 1-based position in the sequence, or None if not found 

482 """ 

483 order_query = f""" 

484 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue) 

485 WHERE {{ 

486 <{subject_uri}> <{predicate_uri}> ?orderedEntity. 

487 OPTIONAL {{ 

488 ?orderedEntity <{order_property}> ?next. 

489 }} 

490 }} 

491 """ 

492 

493 if snapshot: 

494 order_results = list(snapshot.query(order_query)) 

495 

496 order_map = {} 

497 for res in order_results: 

498 ordered_entity = str(res[0]) 

499 next_value = str(res[1]) 

500 order_map[ordered_entity] = None if next_value == "NONE" else next_value 

501 

502 position = _find_entity_position_in_order_map(entity_uri, order_map) 

503 return position 

504 else: 

505 sparql = get_sparql() 

506 sparql.setQuery(order_query) 

507 sparql.setReturnFormat(JSON) 

508 order_results = sparql.query().convert().get("results", {}).get("bindings", []) 

509 

510 order_map = {} 

511 for res in order_results: 

512 ordered_entity = res["orderedEntity"]["value"] 

513 next_value = res["nextValue"]["value"] 

514 order_map[ordered_entity] = None if next_value == "NONE" else next_value 

515 

516 return _find_entity_position_in_order_map(entity_uri, order_map)