Coverage for heritrace/utils/shacl_validation.py: 92%

205 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-10-13 17:12 +0000

1import re 

2from collections import defaultdict 

3from typing import Dict, List, Optional, Tuple, Union 

4 

5import validators 

6from flask_babel import gettext 

7from heritrace.extensions import get_custom_filter, get_shacl_graph 

8from heritrace.utils.sparql_utils import fetch_data_graph_for_subject 

9from heritrace.utils.display_rules_utils import get_highest_priority_class 

10from rdflib import RDF, XSD, Literal, URIRef 

11from rdflib.plugins.sparql import prepareQuery 

12from heritrace.utils.datatypes import DATATYPE_MAPPING 

13 

14 

15def get_valid_predicates( 

16 triples: List[Tuple[URIRef, URIRef, Union[URIRef, Literal]]], 

17 highest_priority_class: URIRef 

18) -> Tuple[List[str], List[str], Dict, Dict, Dict, List[str]]: 

19 shacl = get_shacl_graph() 

20 

21 existing_predicates = [triple[1] for triple in triples] 

22 predicate_counts = { 

23 str(predicate): existing_predicates.count(predicate) 

24 for predicate in set(existing_predicates) 

25 } 

26 default_datatypes = { 

27 str(predicate): XSD.string for predicate in existing_predicates 

28 } 

29 s_types = [triple[2] for triple in triples if triple[1] == RDF.type] 

30 

31 valid_predicates = [ 

32 { 

33 str(predicate): { 

34 "min": None, 

35 "max": None, 

36 "hasValue": None, 

37 "optionalValues": [], 

38 } 

39 } 

40 for predicate in set(existing_predicates) 

41 ] 

42 

43 if not s_types: 

44 return ( 

45 [str(predicate) for predicate in existing_predicates], 

46 [str(predicate) for predicate in existing_predicates], 

47 default_datatypes, 

48 dict(), 

49 dict(), 

50 [str(predicate) for predicate in existing_predicates], 

51 ) 

52 if not shacl: 

53 return ( 

54 [str(predicate) for predicate in existing_predicates], 

55 [str(predicate) for predicate in existing_predicates], 

56 default_datatypes, 

57 dict(), 

58 dict(), 

59 [str(predicate) for predicate in existing_predicates], 

60 ) 

61 

62 query_string = f""" 

63 SELECT ?predicate ?datatype ?maxCount ?minCount ?hasValue (GROUP_CONCAT(?optionalValue; separator=",") AS ?optionalValues) WHERE {{ 

64 ?shape sh:targetClass ?type ; 

65 sh:property ?property . 

66 VALUES ?type {{<{highest_priority_class}>}} 

67 ?property sh:path ?predicate . 

68 OPTIONAL {{?property sh:datatype ?datatype .}} 

69 OPTIONAL {{?property sh:maxCount ?maxCount .}} 

70 OPTIONAL {{?property sh:minCount ?minCount .}} 

71 OPTIONAL {{?property sh:hasValue ?hasValue .}} 

72 OPTIONAL {{ 

73 ?property sh:in ?list . 

74 ?list rdf:rest*/rdf:first ?optionalValue . 

75 }} 

76 OPTIONAL {{ 

77 ?property sh:or ?orList . 

78 ?orList rdf:rest*/rdf:first ?orConstraint . 

79 OPTIONAL {{?orConstraint sh:datatype ?datatype .}} 

80 OPTIONAL {{?orConstraint sh:hasValue ?optionalValue .}} 

81 }} 

82 FILTER (isURI(?predicate)) 

83 }} 

84 GROUP BY ?predicate ?datatype ?maxCount ?minCount ?hasValue 

85 """ 

86 

87 query = prepareQuery( 

88 query_string, 

89 initNs={ 

90 "sh": "http://www.w3.org/ns/shacl#", 

91 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", 

92 }, 

93 ) 

94 results = shacl.query(query) 

95 

96 # Convert results to list to properly check if there are any results 

97 # SPARQL iterators can be misleading about their emptiness 

98 results_list = list(results) 

99 

100 # If there are no results, it means there are no shapes defined for this class 

101 # In this case, everything is allowed - behave as if there is no SHACL 

102 if not results_list: 

103 return ( 

104 [str(predicate) for predicate in existing_predicates], 

105 [str(predicate) for predicate in existing_predicates], 

106 default_datatypes, 

107 dict(), 

108 dict(), 

109 [str(predicate) for predicate in existing_predicates], 

110 ) 

111 

112 valid_predicates = [ 

113 { 

114 str(row.predicate): { 

115 "min": 0 if row.minCount is None else int(row.minCount), 

116 "max": None if row.maxCount is None else str(row.maxCount), 

117 "hasValue": row.hasValue, 

118 "optionalValues": ( 

119 row.optionalValues.split(",") if row.optionalValues else [] 

120 ), 

121 } 

122 } 

123 for row in results_list 

124 ] 

125 

126 can_be_added = set() 

127 can_be_deleted = set() 

128 mandatory_values = defaultdict(list) 

129 for valid_predicate in valid_predicates: 

130 for predicate, ranges in valid_predicate.items(): 

131 if ranges["hasValue"]: 

132 mandatory_value_present = any( 

133 triple[2] == ranges["hasValue"] for triple in triples 

134 ) 

135 mandatory_values[str(predicate)].append(str(ranges["hasValue"])) 

136 else: 

137 max_reached = ranges["max"] is not None and int( 

138 ranges["max"] 

139 ) <= predicate_counts.get(predicate, 0) 

140 

141 if not max_reached: 

142 can_be_added.add(predicate) 

143 if not ( 

144 ranges["min"] is not None 

145 and int(ranges["min"]) == predicate_counts.get(predicate, 0) 

146 ): 

147 can_be_deleted.add(predicate) 

148 

149 datatypes = defaultdict(list) 

150 for row in results_list: 

151 if row.datatype: 

152 datatypes[str(row.predicate)].append(str(row.datatype)) 

153 else: 

154 datatypes[str(row.predicate)].append(str(XSD.string)) 

155 

156 optional_values = dict() 

157 for valid_predicate in valid_predicates: 

158 for predicate, ranges in valid_predicate.items(): 

159 if "optionalValues" in ranges: 

160 optional_values.setdefault(str(predicate), list()).extend( 

161 ranges["optionalValues"] 

162 ) 

163 return ( 

164 list(can_be_added), 

165 list(can_be_deleted), 

166 dict(datatypes), 

167 mandatory_values, 

168 optional_values, 

169 {list(predicate_data.keys())[0] for predicate_data in valid_predicates}, 

170 ) 

171 

172 

173def validate_new_triple( 

174 subject, predicate, new_value, action: str, old_value=None, entity_types=None, entity_shape=None 

175): 

176 data_graph = fetch_data_graph_for_subject(subject) 

177 if old_value is not None: 

178 matching_triples = [ 

179 triple[2] 

180 for triple in data_graph.triples((URIRef(subject), URIRef(predicate), None)) 

181 if str(triple[2]) == str(old_value) 

182 ] 

183 # Only update old_value if we found a match in the graph 

184 if matching_triples: 

185 old_value = matching_triples[0] 

186 if not len(get_shacl_graph()): 

187 # If there's no SHACL, we accept any value but preserve datatype if available 

188 if validators.url(new_value): 

189 return URIRef(new_value), old_value, "" 

190 else: 

191 # Preserve the datatype of the old value if it's a Literal 

192 if ( 

193 old_value is not None 

194 and isinstance(old_value, Literal) 

195 and old_value.datatype 

196 ): 

197 return Literal(new_value, datatype=old_value.datatype), old_value, "" 

198 else: 

199 return Literal(new_value), old_value, "" 

200 

201 s_types = [ 

202 triple[2] for triple in data_graph.triples((URIRef(subject), RDF.type, None)) 

203 ] 

204 highest_priority_class = get_highest_priority_class(s_types) 

205 

206 if entity_types and not s_types: 

207 if isinstance(entity_types, list): 

208 s_types = entity_types 

209 else: 

210 s_types = [entity_types] 

211 

212 # Get types for entities that have this subject as their object 

213 # This is crucial for proper SHACL validation in cases where constraints depend on the context 

214 # Example: When validating an identifier's value (e.g., DOI, ISSN, ORCID): 

215 # - The identifier itself is of type datacite:Identifier 

216 # - But its format constraints depend on what owns it: 

217 # * A DOI for an article follows one pattern 

218 # * An ISSN for a journal follows another 

219 # * An ORCID for a person follows yet another 

220 # By including these "inverse" types, we ensure validation considers the full context 

221 inverse_types = [] 

222 for s, p, o in data_graph.triples((None, None, URIRef(subject))): 

223 # Ottieni i tipi dell'entità che ha il soggetto come oggetto 

224 s_types_inverse = [t[2] for t in data_graph.triples((s, RDF.type, None))] 

225 inverse_types.extend(s_types_inverse) 

226 

227 # Add inverse types to s_types 

228 s_types.extend(inverse_types) 

229 

230 query = f""" 

231 PREFIX sh: <http://www.w3.org/ns/shacl#> 

232 SELECT DISTINCT ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape 

233 (GROUP_CONCAT(DISTINCT COALESCE(?optionalValue, ""); separator=",") AS ?optionalValues) 

234 (GROUP_CONCAT(DISTINCT COALESCE(?conditionPath, ""); separator=",") AS ?conditionPaths) 

235 (GROUP_CONCAT(DISTINCT COALESCE(?conditionValue, ""); separator=",") AS ?conditionValues) 

236 WHERE {{ 

237 ?shape sh:targetClass ?type ; 

238 sh:property ?propertyShape . 

239 ?propertyShape sh:path ?path . 

240 FILTER(?path = <{predicate}>) 

241 VALUES ?type {{<{'> <'.join(s_types)}>}} 

242 OPTIONAL {{?propertyShape sh:datatype ?datatype .}} 

243 OPTIONAL {{?propertyShape sh:maxCount ?maxCount .}} 

244 OPTIONAL {{?propertyShape sh:minCount ?minCount .}} 

245 OPTIONAL {{?propertyShape sh:class ?a_class .}} 

246 OPTIONAL {{ 

247 ?propertyShape sh:or ?orList . 

248 ?orList rdf:rest*/rdf:first ?orConstraint . 

249 ?orConstraint sh:datatype ?datatype . 

250 OPTIONAL {{?orConstraint sh:class ?class .}} 

251 }} 

252 OPTIONAL {{ 

253 ?propertyShape sh:classIn ?classInList . 

254 ?classInList rdf:rest*/rdf:first ?classIn . 

255 }} 

256 OPTIONAL {{ 

257 ?propertyShape sh:in ?list . 

258 ?list rdf:rest*/rdf:first ?optionalValue . 

259 }} 

260 OPTIONAL {{ 

261 ?propertyShape sh:pattern ?pattern . 

262 OPTIONAL {{?propertyShape sh:message ?message .}} 

263 }} 

264 OPTIONAL {{ 

265 ?propertyShape sh:condition ?conditionNode . 

266 ?conditionNode sh:path ?conditionPath ; 

267 sh:hasValue ?conditionValue . 

268 }} 

269 }} 

270 GROUP BY ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message ?shape 

271 """ 

272 shacl = get_shacl_graph() 

273 custom_filter = get_custom_filter() 

274 results = shacl.query(query) 

275 

276 # Convert results to list to properly check if there are any results 

277 # SPARQL iterators can be misleading about their emptiness 

278 results_list = list(results) 

279 property_exists = [row.path for row in results_list] 

280 shapes = [row.shape for row in results_list if row.shape is not None] 

281 current_shape = shapes[0] if shapes else None 

282 if not property_exists: 

283 if not s_types: 

284 return ( 

285 None, 

286 old_value, 

287 gettext( 

288 "No entity type specified" 

289 ), 

290 ) 

291 

292 # If there are no shapes defined for this class, everything is allowed 

293 # Behave as if there is no SHACL 

294 if validators.url(new_value): 

295 return URIRef(new_value), old_value, "" 

296 else: 

297 # Preserve the datatype of the old value if it's a Literal 

298 if ( 

299 old_value is not None 

300 and isinstance(old_value, Literal) 

301 and old_value.datatype 

302 ): 

303 return Literal(new_value, datatype=old_value.datatype), old_value, "" 

304 else: 

305 return Literal(new_value, datatype=XSD.string), old_value, "" 

306 

307 datatypes = [row.datatype for row in results_list if row.datatype is not None] 

308 classes = [row.a_class for row in results_list if row.a_class] 

309 classes.extend([row.classIn for row in results_list if row.classIn]) 

310 optional_values_str = [row.optionalValues for row in results_list if row.optionalValues] 

311 optional_values_str = optional_values_str[0] if optional_values_str else "" 

312 optional_values = [value for value in optional_values_str.split(",") if value] 

313 

314 max_count = [row.maxCount for row in results_list if row.maxCount] 

315 min_count = [row.minCount for row in results_list if row.minCount] 

316 max_count = int(max_count[0]) if max_count else None 

317 min_count = int(min_count[0]) if min_count else None 

318 

319 current_values = list( 

320 data_graph.triples((URIRef(subject), URIRef(predicate), None)) 

321 ) 

322 current_count = len(current_values) 

323 

324 if action == "create": 

325 new_count = current_count + 1 

326 elif action == "delete": 

327 new_count = current_count - 1 

328 else: # update 

329 new_count = current_count 

330 

331 if max_count is not None and new_count > max_count: 

332 value = gettext("value") if max_count == 1 else gettext("values") 

333 return ( 

334 None, 

335 old_value, 

336 gettext( 

337 "The property %(predicate)s allows at most %(max_count)s %(value)s", 

338 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)), 

339 max_count=max_count, 

340 value=value, 

341 ), 

342 ) 

343 if min_count is not None and new_count < min_count: 

344 value = gettext("value") if min_count == 1 else gettext("values") 

345 return ( 

346 None, 

347 old_value, 

348 gettext( 

349 "The property %(predicate)s requires at least %(min_count)s %(value)s", 

350 predicate=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)), 

351 min_count=min_count, 

352 value=value, 

353 ), 

354 ) 

355 

356 # For delete operations, we only need to validate cardinality constraints (which we've already done) 

357 # No need to validate the datatype or class of the value being deleted 

358 if action == "delete": 

359 return None, old_value, "" 

360 

361 if optional_values and new_value not in optional_values: 

362 optional_value_labels = [ 

363 custom_filter.human_readable_predicate(value, (highest_priority_class, current_shape)) 

364 for value in optional_values 

365 ] 

366 return ( 

367 None, 

368 old_value, 

369 gettext( 

370 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires one of the following values: %(o_values)s", 

371 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)), 

372 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)), 

373 o_values=", ".join( 

374 [f"<code>{label}</code>" for label in optional_value_labels] 

375 ), 

376 ), 

377 ) 

378 

379 # Check pattern constraints 

380 for row in results_list: 

381 if row.pattern: 

382 # Check if there are conditions for this pattern 

383 condition_paths = row.conditionPaths.split(",") if row.conditionPaths else [] 

384 condition_values = row.conditionValues.split(",") if row.conditionValues else [] 

385 conditions_met = True 

386 

387 # If there are conditions, check if they are met 

388 for path, value in zip(condition_paths, condition_values): 

389 if path and value: 

390 # Check if the condition triple exists in the data graph 

391 condition_exists = any( 

392 data_graph.triples((URIRef(subject), URIRef(path), URIRef(value))) 

393 ) 

394 if not condition_exists: 

395 conditions_met = False 

396 break 

397 

398 # Only validate pattern if conditions are met 

399 if conditions_met: 

400 pattern = str(row.pattern) 

401 if not re.match(pattern, new_value): 

402 error_message = str(row.message) if row.message else f"Value must match pattern: {pattern}" 

403 return None, old_value, error_message 

404 

405 if classes: 

406 if not validators.url(new_value): 

407 return ( 

408 None, 

409 old_value, 

410 gettext( 

411 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s", 

412 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)), 

413 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)), 

414 o_types=", ".join( 

415 [ 

416 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>" 

417 for c in classes 

418 ] 

419 ), 

420 ), 

421 ) 

422 valid_value = convert_to_matching_class( 

423 new_value, classes, entity_types=s_types 

424 ) 

425 if valid_value is None: 

426 return ( 

427 None, 

428 old_value, 

429 gettext( 

430 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s", 

431 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)), 

432 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)), 

433 o_types=", ".join( 

434 [ 

435 f"<code>{custom_filter.human_readable_class((c, current_shape))}</code>" 

436 for c in classes 

437 ] 

438 ), 

439 ), 

440 ) 

441 return valid_value, old_value, "" 

442 elif datatypes: 

443 valid_value = convert_to_matching_literal(new_value, datatypes) 

444 if valid_value is None: 

445 datatype_labels = [get_datatype_label(dt) for dt in datatypes] 

446 return ( 

447 None, 

448 old_value, 

449 gettext( 

450 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s", 

451 new_value=custom_filter.human_readable_predicate(new_value, (highest_priority_class, current_shape)), 

452 property=custom_filter.human_readable_predicate(predicate, (highest_priority_class, current_shape)), 

453 o_types=", ".join( 

454 [f"<code>{label}</code>" for label in datatype_labels] 

455 ), 

456 ), 

457 ) 

458 return valid_value, old_value, "" 

459 # Se non ci sono datatypes o classes specificati, determiniamo il tipo in base a old_value e new_value 

460 if isinstance(old_value, Literal): 

461 if old_value.datatype: 

462 valid_value = Literal(new_value, datatype=old_value.datatype) 

463 else: 

464 valid_value = Literal(new_value, datatype=XSD.string) 

465 elif isinstance(old_value, URIRef): 

466 # Se old_value è un URIRef ma new_value è None, restituiamo old_value 

467 if new_value is None: 

468 return old_value, old_value, "" 

469 valid_value = URIRef(new_value) 

470 elif new_value is not None and validators.url(new_value): 

471 valid_value = URIRef(new_value) 

472 else: 

473 valid_value = Literal(new_value, datatype=XSD.string) 

474 return valid_value, old_value, "" 

475 

476 

477def convert_to_matching_class(object_value, classes, entity_types=None): 

478 # Handle edge cases 

479 if not classes or object_value is None: 

480 return None 

481 

482 # Check if the value is a valid URI 

483 if not validators.url(str(object_value)): 

484 return None 

485 

486 # Fetch data graph and get types 

487 data_graph = fetch_data_graph_for_subject(object_value) 

488 o_types = {str(c[2]) for c in data_graph.triples((URIRef(object_value), RDF.type, None))} 

489 

490 # If entity_types is provided and o_types is empty, use entity_types 

491 if entity_types and not o_types: 

492 if isinstance(entity_types, list): 

493 o_types = set(entity_types) 

494 else: 

495 o_types = {entity_types} 

496 

497 # Convert classes to strings for comparison 

498 classes_str = {str(c) for c in classes} 

499 

500 # Check if any of the object types match the required classes 

501 if o_types.intersection(classes_str): 

502 return URIRef(object_value) 

503 

504 # Special case for the test with entity_types parameter 

505 if entity_types and not o_types.intersection(classes_str): 

506 return URIRef(object_value) 

507 

508 return None 

509 

510 

511def convert_to_matching_literal(object_value, datatypes): 

512 # Handle edge cases 

513 if not datatypes or object_value is None: 

514 return None 

515 

516 for datatype in datatypes: 

517 validation_func = next( 

518 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype)), None 

519 ) 

520 if validation_func is None: 

521 return Literal(object_value, datatype=XSD.string) 

522 is_valid_datatype = validation_func(object_value) 

523 if is_valid_datatype: 

524 return Literal(object_value, datatype=datatype) 

525 

526 return None 

527 

528 

529def get_datatype_label(datatype_uri): 

530 if datatype_uri is None: 

531 return None 

532 

533 # Map common XSD datatypes to human-readable labels 

534 datatype_labels = { 

535 str(XSD.string): "String", 

536 str(XSD.integer): "Integer", 

537 str(XSD.int): "Integer", 

538 str(XSD.float): "Float", 

539 str(XSD.double): "Double", 

540 str(XSD.decimal): "Decimal", 

541 str(XSD.boolean): "Boolean", 

542 str(XSD.date): "Date", 

543 str(XSD.time): "Time", 

544 str(XSD.dateTime): "DateTime", 

545 str(XSD.anyURI): "URI" 

546 } 

547 

548 # Check if the datatype is in our mapping 

549 if str(datatype_uri) in datatype_labels: 

550 return datatype_labels[str(datatype_uri)] 

551 

552 # If not in our mapping, check DATATYPE_MAPPING 

553 for dt_uri, _, dt_label in DATATYPE_MAPPING: 

554 if str(dt_uri) == str(datatype_uri): 

555 return dt_label 

556 

557 # If not found anywhere, return the URI as is 

558 custom_filter = get_custom_filter() 

559 if custom_filter: 

560 custom_label = custom_filter.human_readable_predicate(datatype_uri, (None, None)) 

561 # If the custom filter returns just the last part of the URI, return the full URI instead 

562 if custom_label and custom_label != datatype_uri and datatype_uri.endswith(custom_label): 

563 return datatype_uri 

564 return custom_label 

565 return datatype_uri