Coverage for heritrace/utils/shacl_utils.py: 96%

164 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-08-01 22:12 +0000

1from typing import List, Optional, Tuple 

2 

3from flask import Flask 

4from heritrace.extensions import get_shacl_graph, get_sparql 

5from heritrace.utils.display_rules_utils import get_class_priority 

6from heritrace.utils.shacl_display import (apply_display_rules, 

7 extract_shacl_form_fields, 

8 order_form_fields, 

9 process_nested_shapes) 

10from rdflib import RDF, Graph 

11from SPARQLWrapper import JSON 

12 

13 

14def get_form_fields_from_shacl(shacl: Graph, display_rules: List[dict], app: Flask): 

15 """ 

16 Analyze SHACL shapes to extract form fields for each entity type. 

17  

18 Args: 

19 shacl: The SHACL graph 

20 display_rules: The display rules configuration 

21 app: Flask application instance 

22 

23 Returns: 

24 OrderedDict: A dictionary where the keys are tuples (class, shape) and the values are dictionaries 

25 of form fields with their properties. 

26 """ 

27 if not shacl: 

28 return dict() 

29 

30 # Step 1: Get the initial form fields from SHACL shapes 

31 form_fields = extract_shacl_form_fields(shacl, display_rules, app=app) 

32 

33 # Step 2: Process nested shapes for each field 

34 processed_shapes = set() 

35 for entity_key in form_fields: 

36 for predicate in form_fields[entity_key]: 

37 for field_info in form_fields[entity_key][predicate]: 

38 if field_info.get("nodeShape"): 

39 field_info["nestedShape"] = process_nested_shapes( 

40 shacl, 

41 display_rules, 

42 field_info["nodeShape"], 

43 app=app, 

44 processed_shapes=processed_shapes, 

45 ) 

46 

47 # Step 3: Apply display rules to the form fields 

48 if display_rules: 

49 form_fields = apply_display_rules(shacl, form_fields, display_rules) 

50 

51 # Step 3.5: Ensure all form fields have displayName, using fallback for those without display rules 

52 ensure_display_names(form_fields) 

53 

54 # Step 4: Order the form fields according to the display rules 

55 ordered_form_fields = order_form_fields(form_fields, display_rules) 

56 return ordered_form_fields 

57 

58 

59def determine_shape_for_classes(class_list: List[str]) -> Optional[str]: 

60 """ 

61 Determine the most appropriate SHACL shape for a list of class URIs. 

62  

63 Args: 

64 class_list: List of class URIs to find shapes for 

65  

66 Returns: 

67 The most appropriate shape URI based on priority, or None if no shapes are found 

68 """ 

69 shacl_graph = get_shacl_graph() 

70 if not shacl_graph: 

71 return None 

72 

73 all_shacl_shapes = [] 

74 

75 for class_uri in class_list: 

76 query_string = f""" 

77 SELECT DISTINCT ?shape WHERE {{ 

78 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> . 

79 }} 

80 """ 

81 

82 results = shacl_graph.query(query_string) 

83 shapes = [str(row.shape) for row in results] 

84 

85 for shape in shapes: 

86 all_shacl_shapes.append((class_uri, shape)) 

87 

88 return _find_highest_priority_shape(all_shacl_shapes) 

89 

90 

91def determine_shape_for_entity_triples(entity_triples: list) -> Optional[str]: 

92 """ 

93 Determine the most appropriate SHACL shape for an entity based on its triples. 

94  

95 Uses a multi-criteria scoring system to distinguish between shapes: 

96 1. sh:hasValue constraint matches (highest priority) 

97 2. Property matching - number of shape properties present in entity 

98 3. Class priority - predefined priority ordering 

99  

100 Args: 

101 entity_triples: List of triples (subject, predicate, object) for the entity 

102  

103 Returns: 

104 The most appropriate shape URI, or None if no shapes are found 

105 """ 

106 shacl_graph = get_shacl_graph() 

107 if not shacl_graph: 

108 return None 

109 

110 entity_classes = [] 

111 entity_properties = set() 

112 

113 for subject, predicate, obj in entity_triples: 

114 if str(predicate) == str(RDF.type): 

115 entity_classes.append(str(obj)) 

116 entity_properties.add(str(predicate)) 

117 

118 if not entity_classes: 

119 return None 

120 

121 candidate_shapes = [] 

122 

123 for class_uri in entity_classes: 

124 query_string = f""" 

125 SELECT DISTINCT ?shape WHERE {{ 

126 ?shape <http://www.w3.org/ns/shacl#targetClass> <{class_uri}> . 

127 }} 

128 """ 

129 

130 results = shacl_graph.query(query_string) 

131 shapes = [str(row.shape) for row in results] 

132 

133 for shape in shapes: 

134 candidate_shapes.append((class_uri, shape)) 

135 

136 if not candidate_shapes: 

137 return None 

138 

139 if len(candidate_shapes) == 1: 

140 return candidate_shapes[0][1] 

141 

142 shape_scores = {} 

143 

144 for class_uri, shape_uri in candidate_shapes: 

145 shape_properties = _get_shape_properties(shacl_graph, shape_uri) 

146 property_matches = len(entity_properties.intersection(shape_properties)) 

147 

148 hasvalue_matches = _check_hasvalue_constraints(shacl_graph, shape_uri, entity_triples) 

149 

150 entity_key = (class_uri, shape_uri) 

151 priority = get_class_priority(entity_key) 

152 

153 # Combined score: (hasvalue_matches, property_matches, -priority) 

154 # hasValue matches are most important, then property matches, then priority 

155 combined_score = (hasvalue_matches, property_matches, -priority) 

156 shape_scores[shape_uri] = combined_score 

157 

158 best_shape = max(shape_scores.keys(), key=lambda s: shape_scores[s]) 

159 return best_shape 

160 

161 

162def _find_highest_priority_shape(class_shape_pairs: List[Tuple[str, str]]) -> Optional[str]: 

163 """ 

164 Helper function to find the shape with the highest priority from a list of (class_uri, shape) pairs. 

165  

166 Args: 

167 class_shape_pairs: List of tuples (class_uri, shape) 

168  

169 Returns: 

170 The shape with the highest priority, or None if the list is empty 

171 """ 

172 highest_priority = float('inf') 

173 highest_priority_shape = None 

174 

175 for class_uri, shape in class_shape_pairs: 

176 entity_key = (class_uri, shape) 

177 priority = get_class_priority(entity_key) 

178 if priority < highest_priority: 

179 highest_priority = priority 

180 highest_priority_shape = shape 

181 

182 return highest_priority_shape 

183 

184 

185def _get_shape_properties(shacl_graph: Graph, shape_uri: str) -> set: 

186 """ 

187 Extract all properties defined in a SHACL shape. 

188  

189 Args: 

190 shacl_graph: The SHACL graph 

191 shape_uri: URI of the shape to analyze 

192  

193 Returns: 

194 Set of property URIs defined in the shape 

195 """ 

196 properties = set() 

197 

198 query_string = f""" 

199 PREFIX sh: <http://www.w3.org/ns/shacl#> 

200 SELECT DISTINCT ?property WHERE {{ 

201 <{shape_uri}> sh:property ?propertyShape . 

202 ?propertyShape sh:path ?property . 

203 }} 

204 """ 

205 

206 results = shacl_graph.query(query_string) 

207 for row in results: 

208 properties.add(str(row.property)) 

209 

210 return properties 

211 

212 

213def _check_hasvalue_constraints(shacl_graph: Graph, shape_uri: str, entity_triples: list) -> int: 

214 """ 

215 Check how many sh:hasValue constraints the entity satisfies for a given shape. 

216  

217 Args: 

218 shacl_graph: The SHACL graph 

219 shape_uri: URI of the shape to check 

220 entity_triples: List of triples (subject, predicate, object) for the entity 

221  

222 Returns: 

223 Number of hasValue constraints satisfied by the entity 

224 """ 

225 # Get all hasValue constraints for this shape 

226 query_string = f""" 

227 PREFIX sh: <http://www.w3.org/ns/shacl#> 

228 SELECT DISTINCT ?property ?value WHERE {{ 

229 <{shape_uri}> sh:property ?propertyShape . 

230 ?propertyShape sh:path ?property . 

231 ?propertyShape sh:hasValue ?value . 

232 }} 

233 """ 

234 

235 results = shacl_graph.query(query_string) 

236 constraints = [(str(row.property), str(row.value)) for row in results] 

237 

238 if not constraints: 

239 return 0 

240 

241 # Create a set of (predicate, object) pairs from entity triples 

242 entity_property_values = set() 

243 for _, predicate, obj in entity_triples: 

244 entity_property_values.add((str(predicate), str(obj))) 

245 

246 # Count how many constraints are satisfied 

247 satisfied_constraints = 0 

248 for property_uri, required_value in constraints: 

249 if (property_uri, required_value) in entity_property_values: 

250 satisfied_constraints += 1 

251 

252 return satisfied_constraints 

253 

254 

255def ensure_display_names(form_fields): 

256 """ 

257 Ensures all form fields have a displayName, using URI formatting as fallback. 

258  

259 Args: 

260 form_fields: Dictionary of form fields to process 

261 """ 

262 from heritrace.utils.filters import format_uri_as_readable 

263 

264 for entity_key, predicates in form_fields.items(): 

265 for predicate_uri, details_list in predicates.items(): 

266 for field_info in details_list: 

267 # Only add displayName if not already present 

268 if not field_info.get("displayName"): 

269 field_info["displayName"] = format_uri_as_readable(predicate_uri) 

270 

271 

272def find_matching_form_field(class_uri=None, shape_uri=None, form_fields=None): 

273 """ 

274 Find the most appropriate form field configuration for a given class and/or shape. 

275 At least one of class_uri or shape_uri must be provided. 

276  

277 Args: 

278 class_uri: Optional URI of the class 

279 shape_uri: Optional URI of the shape 

280 form_fields: Optional dictionary of form fields to search in, defaults to global form_fields 

281  

282 Returns: 

283 The matching form field key (class_uri, shape_uri) or None if no match is found 

284 """ 

285 if not form_fields: 

286 from heritrace.extensions import get_form_fields 

287 form_fields = get_form_fields() 

288 

289 if not form_fields: 

290 return None 

291 

292 class_match = None 

293 shape_match = None 

294 

295 for field_key in form_fields.keys(): 

296 field_class_uri = field_key[0] 

297 field_shape_uri = field_key[1] 

298 

299 # Case 1: Both class and shape match (exact match) 

300 if class_uri and shape_uri and \ 

301 field_class_uri == str(class_uri) and \ 

302 field_shape_uri == str(shape_uri): 

303 return field_key 

304 

305 # Case 2: Only class matches (and form field has no shape constraint) 

306 elif class_uri and field_class_uri == str(class_uri) and field_shape_uri is None: 

307 class_match = field_key 

308 

309 # Case 3: Only shape matches (and form field has no class constraint)  

310 elif shape_uri and field_shape_uri == str(shape_uri) and field_class_uri is None: 

311 shape_match = field_key 

312 

313 # Return the best match based on specificity 

314 # Shape rules typically have higher specificity, so prefer them 

315 if shape_match: 

316 return shape_match 

317 elif class_match: 

318 return class_match 

319 

320 return None 

321 

322 

323def _find_entity_position_in_order_map(entity_uri: str, order_map: dict) -> Optional[int]: 

324 """ 

325 Helper function to find entity position in an order map. 

326  

327 This function handles the case where there might be multiple independent ordered chains 

328 within the same predicate relationship. Each chain has its own starting element and 

329 follows a linked-list structure where each entity points to the next one. 

330  

331 Args: 

332 entity_uri: URI of the entity to find position for 

333 order_map: Dictionary mapping entities to their next entity in sequence. 

334 Key = current entity URI, Value = next entity URI (or None for last element) 

335 Example: {'entity1': 'entity2', 'entity2': 'entity3', 'entity3': None, 

336 'entity4': 'entity5', 'entity5': None} 

337 This represents two chains: [entity1 -> entity2 -> entity3] and [entity4 -> entity5] 

338  

339 Returns: 

340 1-based position in the sequence, or None if not found 

341 """ 

342 # Find all starting elements of ordered chains. 

343 # A start element is one that appears as a key in the order_map but never as a value, 

344 # meaning no other entity points to it (it's the head of a chain). 

345 start_elements = set(order_map.keys()) - set(v for v in order_map.values() if v is not None) 

346 

347 if not start_elements: 

348 # No valid starting points found - this shouldn't happen in well-formed data 

349 return None 

350 

351 # Since there can be multiple independent ordered chains, we need to check each one 

352 # to find which chain contains our target entity 

353 for start_element in start_elements: 

354 # Build the complete sequence for this chain by following the linked-list structure 

355 sequence = [] 

356 current_element = start_element 

357 

358 # Follow the chain from start to end 

359 while current_element in order_map: 

360 sequence.append(current_element) 

361 # Move to the next element in the chain (or None if we've reached the end) 

362 current_element = order_map[current_element] 

363 

364 # Check if our target entity is in this particular chain 

365 try: 

366 # If found, return its 1-based position within this chain 

367 return sequence.index(entity_uri) + 1 # Convert from 0-based to 1-based indexing 

368 except ValueError: 

369 # Entity not found in this chain, try the next one 

370 continue 

371 

372 # Entity was not found in any of the ordered chains 

373 return None 

374 

375 

376def get_entity_position_in_sequence(entity_uri: str, subject_uri: str, predicate_uri: str, 

377 order_property: str, snapshot: Optional[Graph] = None) -> Optional[int]: 

378 """ 

379 Get the position of an entity in an ordered sequence. 

380  

381 Args: 

382 entity_uri: URI of the entity to find position for 

383 subject_uri: URI of the subject that has the ordered property 

384 predicate_uri: URI of the ordered predicate 

385 order_property: URI of the property that defines the ordering 

386 snapshot: Optional graph snapshot for historical queries 

387  

388 Returns: 

389 1-based position in the sequence, or None if not found 

390 """ 

391 order_query = f""" 

392 SELECT ?orderedEntity (COALESCE(?next, "NONE") AS ?nextValue) 

393 WHERE {{ 

394 <{subject_uri}> <{predicate_uri}> ?orderedEntity. 

395 OPTIONAL {{ 

396 ?orderedEntity <{order_property}> ?next. 

397 }} 

398 }} 

399 """ 

400 

401 if snapshot: 

402 order_results = list(snapshot.query(order_query)) 

403 

404 order_map = {} 

405 for res in order_results: 

406 ordered_entity = str(res[0]) 

407 next_value = str(res[1]) 

408 order_map[ordered_entity] = None if next_value == "NONE" else next_value 

409 

410 position = _find_entity_position_in_order_map(entity_uri, order_map) 

411 return position 

412 else: 

413 sparql = get_sparql() 

414 sparql.setQuery(order_query) 

415 sparql.setReturnFormat(JSON) 

416 order_results = sparql.query().convert().get("results", {}).get("bindings", []) 

417 

418 order_map = {} 

419 for res in order_results: 

420 ordered_entity = res["orderedEntity"]["value"] 

421 next_value = res["nextValue"]["value"] 

422 order_map[ordered_entity] = None if next_value == "NONE" else next_value 

423 

424 return _find_entity_position_in_order_map(entity_uri, order_map)