Coverage for heritrace/utils/shacl_utils.py: 94%

461 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-18 11:10 +0000

1import re 

2from collections import OrderedDict, defaultdict 

3from typing import List 

4 

5import validators 

6from flask_babel import gettext 

7from heritrace.extensions import get_custom_filter, get_shacl_graph 

8from heritrace.utils.display_rules_utils import get_highest_priority_class 

9from heritrace.utils.sparql_utils import fetch_data_graph_for_subject 

10from rdflib import RDF, XSD, Graph, Literal, URIRef 

11from rdflib.plugins.sparql import prepareQuery 

12from resources.datatypes import DATATYPE_MAPPING 

13 

14COMMON_SPARQL_QUERY = prepareQuery( 

15 """ 

16 SELECT ?shape ?type ?predicate ?nodeShape ?datatype ?maxCount ?minCount ?hasValue ?objectClass  

17 ?conditionPath ?conditionValue ?pattern ?message 

18 (GROUP_CONCAT(?optionalValue; separator=",") AS ?optionalValues) 

19 (GROUP_CONCAT(?orNode; separator=",") AS ?orNodes) 

20 WHERE { 

21 ?shape sh:targetClass ?type ; 

22 sh:property ?property . 

23 ?property sh:path ?predicate . 

24 OPTIONAL { 

25 ?property sh:node ?nodeShape . 

26 OPTIONAL {?nodeShape sh:targetClass ?objectClass .} 

27 } 

28 OPTIONAL { 

29 ?property sh:or ?orList . 

30 { 

31 ?orList rdf:rest*/rdf:first ?orConstraint . 

32 ?orConstraint sh:datatype ?datatype . 

33 } UNION { 

34 ?orList rdf:rest*/rdf:first ?orNodeShape . 

35 ?orNodeShape sh:node ?orNode . 

36 } 

37 } 

38 OPTIONAL { ?property sh:datatype ?datatype . } 

39 OPTIONAL { ?property sh:maxCount ?maxCount . } 

40 OPTIONAL { ?property sh:minCount ?minCount . } 

41 OPTIONAL { ?property sh:hasValue ?hasValue . } 

42 OPTIONAL { 

43 ?property sh:in ?list . 

44 ?list rdf:rest*/rdf:first ?optionalValue . 

45 } 

46 OPTIONAL { 

47 ?property sh:condition ?conditionNode . 

48 ?conditionNode sh:path ?conditionPath ; 

49 sh:hasValue ?conditionValue . 

50 } 

51 OPTIONAL { ?property sh:pattern ?pattern . } 

52 OPTIONAL { ?property sh:message ?message . } 

53 FILTER (isURI(?predicate)) 

54 } 

55 GROUP BY ?shape ?type ?predicate ?nodeShape ?datatype ?maxCount ?minCount ?hasValue  

56 ?objectClass ?conditionPath ?conditionValue ?pattern ?message 

57""", 

58 initNs={ 

59 "sh": "http://www.w3.org/ns/shacl#", 

60 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", 

61 }, 

62) 

63 

64 

65def get_form_fields_from_shacl(shacl: Graph, display_rules: List[dict]): 

66 """ 

67 Analizza le shape SHACL per estrarre i campi del form per ogni tipo di entità. 

68 

69 Restituisce: 

70 OrderedDict: Un dizionario dove le chiavi sono i tipi di entità e i valori sono dizionari 

71 dei campi del form con le loro proprietà. 

72 """ 

73 if not shacl: 

74 return dict() 

75 

76 # Step 1: Ottieni i campi iniziali dalle shape SHACL 

77 form_fields = extract_shacl_form_fields(shacl, display_rules) 

78 

79 # Step 2: Processa le shape annidate per ogni campo 

80 processed_shapes = set() 

81 for entity_type in form_fields: 

82 for predicate in form_fields[entity_type]: 

83 for field_info in form_fields[entity_type][predicate]: 

84 if field_info.get("nodeShape"): 

85 field_info["nestedShape"] = process_nested_shapes( 

86 shacl, 

87 display_rules, 

88 field_info["nodeShape"], 

89 processed_shapes=processed_shapes, 

90 ) 

91 

92 # Step 3: Applica le regole di visualizzazione ai campi del form 

93 if display_rules: 

94 form_fields = apply_display_rules(shacl, form_fields, display_rules) 

95 

96 # Step 4: Ordina i campi del form secondo le regole di visualizzazione 

97 ordered_form_fields = order_form_fields(form_fields, display_rules) 

98 

99 return ordered_form_fields 

100 

101 

102def extract_shacl_form_fields(shacl, display_rules): 

103 """ 

104 Estrae i campi del form dalle shape SHACL. 

105 

106 Restituisce: 

107 defaultdict: Un dizionario dove le chiavi sono i tipi di entità e i valori sono dizionari 

108 dei campi del form con le loro proprietà. 

109 """ 

110 if not shacl: 

111 return dict() 

112 

113 processed_shapes = set() 

114 results = execute_shacl_query(shacl, COMMON_SPARQL_QUERY) 

115 form_fields = process_query_results( 

116 shacl, results, display_rules, processed_shapes, depth=0 

117 ) 

118 return form_fields 

119 

120 

121def execute_shacl_query(shacl: Graph, query, init_bindings=None): 

122 """ 

123 Esegue una query SPARQL sul grafo SHACL con eventuali binding iniziali. 

124 

125 Argomenti: 

126 shacl (Graph): Il grafo SHACL su cui eseguire la query. 

127 query (PreparedQuery): La query SPARQL preparata. 

128 init_bindings (dict): I binding iniziali per la query. 

129 

130 Restituisce: 

131 Result: I risultati della query. 

132 """ 

133 if init_bindings: 

134 return shacl.query(query, initBindings=init_bindings) 

135 else: 

136 return shacl.query(query) 

137 

138 

139def get_display_name_for_shape(entity_type, property_uri, shape_uri, display_rules): 

140 """ 

141 Helper function to get displayName from display_rules by matching entity class, 

142 property, and shape URI. 

143 

144 Args: 

145 entity_type (str): The type of the current entity 

146 property_uri (str): The URI of the property being processed 

147 shape_uri (str): The URI of the shape to match 

148 display_rules (list): The display rules configuration 

149 

150 Returns: 

151 str: The display name if found, None otherwise 

152 """ 

153 if display_rules: 

154 for rule in display_rules: 

155 # Match the entity class first 

156 if rule.get("class") == entity_type: 

157 # Then find the matching property 

158 for prop in rule.get("displayProperties", []): 

159 if prop.get("property") == property_uri: 

160 # Finally match the shape in displayRules 

161 for shape_rule in prop.get("displayRules", []): 

162 if shape_rule.get("shape") == shape_uri: 

163 return shape_rule.get("displayName") 

164 return None 

165 

166 

167def process_query_results(shacl, results, display_rules, processed_shapes, depth=0): 

168 form_fields = defaultdict(dict) 

169 for row in results: 

170 subject_shape = str(row.shape) 

171 entity_type = str(row.type) 

172 predicate = str(row.predicate) 

173 nodeShape = str(row.nodeShape) if row.nodeShape else None 

174 hasValue = str(row.hasValue) if row.hasValue else None 

175 objectClass = str(row.objectClass) if row.objectClass else None 

176 minCount = 0 if row.minCount is None else int(row.minCount) 

177 maxCount = None if row.maxCount is None else int(row.maxCount) 

178 datatype = str(row.datatype) if row.datatype else None 

179 optionalValues = [v for v in (row.optionalValues or "").split(",") if v] 

180 orNodes = [v for v in (row.orNodes or "").split(",") if v] 

181 

182 condition_entry = {} 

183 if row.conditionPath and row.conditionValue: 

184 condition_entry["condition"] = { 

185 "path": str(row.conditionPath), 

186 "value": str(row.conditionValue), 

187 } 

188 if row.pattern: 

189 condition_entry["pattern"] = str(row.pattern) 

190 if row.message: 

191 condition_entry["message"] = str(row.message) 

192 

193 if predicate not in form_fields[entity_type]: 

194 form_fields[entity_type][predicate] = [] 

195 

196 nodeShapes = [] 

197 if nodeShape: 

198 nodeShapes.append(nodeShape) 

199 nodeShapes.extend(orNodes) 

200 

201 existing_field = None 

202 for field in form_fields[entity_type][predicate]: 

203 if ( 

204 field.get("nodeShape") == nodeShape 

205 and field.get("nodeShapes") == nodeShapes 

206 and field.get("subjectShape") == subject_shape 

207 and field.get("hasValue") == hasValue 

208 and field.get("objectClass") == objectClass 

209 and field.get("min") == minCount 

210 and field.get("max") == maxCount 

211 and field.get("optionalValues") == optionalValues 

212 ): 

213 existing_field = field 

214 break 

215 

216 if existing_field: 

217 if datatype and str(datatype) not in existing_field.get("datatypes", []): 

218 existing_field.setdefault("datatypes", []).append(str(datatype)) 

219 if condition_entry: 

220 existing_field.setdefault("conditions", []).append(condition_entry) 

221 if orNodes: 

222 existing_field.setdefault("or", []) 

223 for node in orNodes: 

224 entity_type_or_node = get_shape_target_class(shacl, node) 

225 object_class = get_object_class(shacl, node, predicate) 

226 shape_display_name = get_display_name_for_shape( 

227 entity_type, predicate, node, display_rules 

228 ) 

229 # Process orNode as a field_info 

230 or_field_info = { 

231 "entityType": entity_type_or_node, 

232 "uri": predicate, 

233 "displayName": shape_display_name, 

234 "subjectShape": subject_shape, 

235 "nodeShape": node, 

236 "min": minCount, 

237 "max": maxCount, 

238 "hasValue": hasValue, 

239 "objectClass": object_class, 

240 "optionalValues": optionalValues, 

241 "conditions": [condition_entry] if condition_entry else [], 

242 } 

243 if node not in processed_shapes: 

244 or_field_info["nestedShape"] = process_nested_shapes( 

245 shacl, 

246 display_rules, 

247 node, 

248 depth=depth + 1, 

249 processed_shapes=processed_shapes, 

250 ) 

251 existing_field["or"].append(or_field_info) 

252 else: 

253 field_info = { 

254 "entityType": entity_type, 

255 "uri": predicate, 

256 "nodeShape": nodeShape, 

257 "nodeShapes": nodeShapes, 

258 "subjectShape": subject_shape, 

259 "datatypes": [datatype] if datatype else [], 

260 "min": minCount, 

261 "max": maxCount, 

262 "hasValue": hasValue, 

263 "objectClass": objectClass, 

264 "optionalValues": optionalValues, 

265 "conditions": [condition_entry] if condition_entry else [], 

266 "inputType": determine_input_type(datatype), 

267 } 

268 

269 if nodeShape and nodeShape not in processed_shapes: 

270 field_info["nestedShape"] = process_nested_shapes( 

271 shacl, 

272 display_rules, 

273 nodeShape, 

274 depth=depth + 1, 

275 processed_shapes=processed_shapes, 

276 ) 

277 

278 if orNodes: 

279 field_info["or"] = [] 

280 for node in orNodes: 

281 # Process orNode as a field_info 

282 entity_type_or_node = get_shape_target_class(shacl, node) 

283 object_class = get_object_class(shacl, node, predicate) 

284 shape_display_name = get_display_name_for_shape( 

285 entity_type, predicate, node, display_rules 

286 ) 

287 or_field_info = { 

288 "entityType": entity_type_or_node, 

289 "uri": predicate, 

290 "displayName": shape_display_name, 

291 "subjectShape": subject_shape, 

292 "nodeShape": node, 

293 "min": minCount, 

294 "max": maxCount, 

295 "hasValue": hasValue, 

296 "objectClass": objectClass, 

297 "optionalValues": optionalValues, 

298 "conditions": [condition_entry] if condition_entry else [], 

299 } 

300 if node not in processed_shapes: 

301 or_field_info["nestedShape"] = process_nested_shapes( 

302 shacl, 

303 display_rules, 

304 node, 

305 depth=depth + 1, 

306 processed_shapes=processed_shapes, 

307 ) 

308 field_info["or"].append(or_field_info) 

309 

310 form_fields[entity_type][predicate].append(field_info) 

311 

312 return form_fields 

313 

314 

315def get_shape_target_class(shacl, shape_uri): 

316 query = prepareQuery( 

317 """ 

318 SELECT ?targetClass 

319 WHERE { 

320 ?shape sh:targetClass ?targetClass . 

321 } 

322 """, 

323 initNs={"sh": "http://www.w3.org/ns/shacl#"}, 

324 ) 

325 results = execute_shacl_query(shacl, query, {"shape": URIRef(shape_uri)}) 

326 for row in results: 

327 return str(row.targetClass) 

328 return None 

329 

330 

331def get_object_class(shacl, shape_uri, predicate_uri): 

332 query = prepareQuery( 

333 """ 

334 SELECT DISTINCT ?targetClass 

335 WHERE { 

336 ?shape sh:property ?propertyShape . 

337 ?propertyShape sh:path ?predicate . 

338 { 

339 # Caso 1: definizione diretta con sh:node 

340 ?propertyShape sh:node ?nodeShape . 

341 ?nodeShape sh:targetClass ?targetClass . 

342 } UNION { 

343 # Caso 2: definizione diretta con sh:class 

344 ?propertyShape sh:class ?targetClass . 

345 } UNION { 

346 # Caso 3: definizione con sh:or che include node shapes 

347 ?propertyShape sh:or ?orList . 

348 ?orList rdf:rest*/rdf:first ?choice . 

349 { 

350 ?choice sh:node ?nodeShape . 

351 ?nodeShape sh:targetClass ?targetClass . 

352 } UNION { 

353 ?choice sh:class ?targetClass . 

354 } 

355 } 

356 } 

357 """, 

358 initNs={ 

359 "sh": "http://www.w3.org/ns/shacl#", 

360 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", 

361 }, 

362 ) 

363 

364 results = execute_shacl_query( 

365 shacl, query, {"shape": URIRef(shape_uri), "predicate": URIRef(predicate_uri)} 

366 ) 

367 

368 # Prendiamo il primo risultato valido 

369 for row in results: 

370 if row.targetClass: 

371 return str(row.targetClass) 

372 return None 

373 

374 

375def process_nested_shapes( 

376 shacl, display_rules, shape_uri, depth=0, processed_shapes=None 

377): 

378 """ 

379 Processa ricorsivamente le shape annidate. 

380 

381 Argomenti: 

382 shape_uri (str): L'URI della shape da processare. 

383 depth (int): La profondità corrente della ricorsione. 

384 processed_shapes (set): Un insieme delle shape già processate. 

385 

386 Restituisce: 

387 list: Una lista di dizionari dei campi annidati. 

388 """ 

389 if processed_shapes is None: 

390 processed_shapes = set() 

391 

392 if shape_uri in processed_shapes: 

393 return [] 

394 

395 processed_shapes.add(shape_uri) 

396 init_bindings = {"shape": URIRef(shape_uri)} 

397 nested_results = execute_shacl_query(shacl, COMMON_SPARQL_QUERY, init_bindings) 

398 nested_fields = [] 

399 

400 temp_form_fields = process_query_results( 

401 shacl, nested_results, display_rules, processed_shapes, depth 

402 ) 

403 

404 # Applica le regole di visualizzazione ai campi annidati 

405 if display_rules: 

406 temp_form_fields = apply_display_rules(shacl, temp_form_fields, display_rules) 

407 temp_form_fields = order_form_fields(temp_form_fields, display_rules) 

408 

409 # Estrai i campi per il tipo di entità 

410 for entity_type in temp_form_fields: 

411 for predicate in temp_form_fields[entity_type]: 

412 nested_fields.extend(temp_form_fields[entity_type][predicate]) 

413 

414 processed_shapes.remove(shape_uri) 

415 return nested_fields 

416 

417 

418def get_property_order(entity_type, display_rules): 

419 """ 

420 Recupera l'ordine delle proprietà per un tipo di entità dalle regole di visualizzazione. 

421 

422 Argomenti: 

423 entity_type (str): L'URI del tipo di entità. 

424 

425 Restituisce: 

426 list: Una lista di URI di proprietà nell'ordine desiderato. 

427 """ 

428 if not display_rules: 

429 return [] 

430 

431 for rule in display_rules: 

432 if rule.get("class") == entity_type and "propertyOrder" in rule: 

433 return rule["propertyOrder"] 

434 elif rule.get("class") == entity_type: 

435 return [prop["property"] for prop in rule.get("displayProperties", [])] 

436 return [] 

437 

438 

439def order_fields(fields, property_order): 

440 """ 

441 Ordina i campi secondo l'ordine specificato delle proprietà. 

442 

443 Argomenti: 

444 fields (list): Una lista di dizionari dei campi da ordinare. 

445 property_order (list): Una lista di URI di proprietà nell'ordine desiderato. 

446 

447 Restituisce: 

448 list: Una lista ordinata di dizionari dei campi. 

449 """ 

450 if not fields: 

451 return [] 

452 if not property_order: 

453 return fields 

454 

455 # Create a dictionary to map predicates to their position in property_order 

456 order_dict = {pred: i for i, pred in enumerate(property_order)} 

457 

458 # Sort fields based on their position in property_order 

459 # Fields not in property_order will be placed at the end 

460 return sorted( 

461 fields, 

462 key=lambda f: order_dict.get(f.get("predicate", f.get("uri", "")), float("inf")), 

463 ) 

464 

465 

466def apply_display_rules(shacl, form_fields, display_rules): 

467 """ 

468 Applica le regole di visualizzazione ai campi del form. 

469 

470 Argomenti: 

471 form_fields (dict): I campi del form iniziali estratti dalle shape SHACL. 

472 

473 Restituisce: 

474 dict: I campi del form dopo aver applicato le regole di visualizzazione. 

475 """ 

476 for rule in display_rules: 

477 entity_class = rule.get("class") 

478 if entity_class and entity_class in form_fields: 

479 for prop in rule.get("displayProperties", []): 

480 prop_uri = prop["property"] 

481 if prop_uri in form_fields[entity_class]: 

482 for field_info in form_fields[entity_class][prop_uri]: 

483 add_display_information(field_info, prop) 

484 # Chiamata ricorsiva per le nestedShape 

485 if "nestedShape" in field_info: 

486 apply_display_rules_to_nested_shapes( 

487 field_info["nestedShape"], prop, display_rules 

488 ) 

489 if "or" in field_info: 

490 for or_field in field_info["or"]: 

491 apply_display_rules_to_nested_shapes( 

492 [or_field], field_info, display_rules 

493 ) 

494 if "intermediateRelation" in prop: 

495 handle_intermediate_relation(shacl, field_info, prop) 

496 if "displayRules" in prop: 

497 handle_sub_display_rules( 

498 shacl, 

499 form_fields, 

500 entity_class, 

501 form_fields[entity_class][prop_uri], 

502 prop, 

503 ) 

504 return form_fields 

505 

506 

507def apply_display_rules_to_nested_shapes(nested_fields, parent_prop, shape_uri): 

508 """Apply display rules to nested shapes.""" 

509 if not nested_fields: 

510 return [] 

511 

512 # Handle case where parent_prop is not a dictionary 

513 if not isinstance(parent_prop, dict): 

514 return nested_fields 

515 

516 # Create a new list to avoid modifying the original 

517 result_fields = [] 

518 for field in nested_fields: 

519 # Create a copy of the field to avoid modifying the original 

520 new_field = field.copy() 

521 result_fields.append(new_field) 

522 

523 # Find the matching shape in the parent property's display rules 

524 found_matching_shape = False 

525 for rule in parent_prop.get("displayRules", []): 

526 if rule.get("shape") == shape_uri and "nestedDisplayRules" in rule: 

527 found_matching_shape = True 

528 # Apply nested display rules to each field 

529 for field in result_fields: 

530 for nested_rule in rule["nestedDisplayRules"]: 

531 # Check both predicate and uri keys to be more flexible 

532 field_key = field.get("predicate", field.get("uri")) 

533 if field_key == nested_rule["property"]: 

534 # Apply display properties from the rule to the field 

535 for key, value in nested_rule.items(): 

536 if key != "property": 

537 field[key] = value 

538 break 

539 

540 return result_fields 

541 

542 

543def determine_input_type(datatype): 

544 """ 

545 Determina il tipo di input appropriato basato sul datatype XSD. 

546 """ 

547 if not datatype: 

548 return "text" 

549 

550 datatype = str(datatype) 

551 datatype_to_input = { 

552 "http://www.w3.org/2001/XMLSchema#string": "text", 

553 "http://www.w3.org/2001/XMLSchema#integer": "number", 

554 "http://www.w3.org/2001/XMLSchema#decimal": "number", 

555 "http://www.w3.org/2001/XMLSchema#float": "number", 

556 "http://www.w3.org/2001/XMLSchema#double": "number", 

557 "http://www.w3.org/2001/XMLSchema#boolean": "checkbox", 

558 "http://www.w3.org/2001/XMLSchema#date": "date", 

559 "http://www.w3.org/2001/XMLSchema#time": "time", 

560 "http://www.w3.org/2001/XMLSchema#dateTime": "datetime-local", 

561 "http://www.w3.org/2001/XMLSchema#anyURI": "url", 

562 "http://www.w3.org/2001/XMLSchema#email": "email", 

563 } 

564 return datatype_to_input.get(datatype, "text") 

565 

566 

567def add_display_information(field_info, prop): 

568 """ 

569 Aggiunge informazioni di visualizzazione dal display_rules ad un campo. 

570 

571 Argomenti: 

572 field_info (dict): Le informazioni del campo da aggiornare. 

573 prop (dict): Le informazioni della proprietà dalle display_rules. 

574 """ 

575 if "displayName" in prop: 

576 field_info["displayName"] = prop["displayName"] 

577 if "shouldBeDisplayed" in prop: 

578 field_info["shouldBeDisplayed"] = prop.get("shouldBeDisplayed", True) 

579 if "orderedBy" in prop: 

580 field_info["orderedBy"] = prop["orderedBy"] 

581 if "inputType" in prop: 

582 field_info["inputType"] = prop["inputType"] 

583 if "supportsSearch" in prop: 

584 field_info["supportsSearch"] = prop["supportsSearch"] 

585 if "minCharsForSearch" in prop: 

586 field_info["minCharsForSearch"] = prop["minCharsForSearch"] 

587 if "searchTarget" in prop: 

588 field_info["searchTarget"] = prop["searchTarget"] 

589 

590 

591def handle_intermediate_relation(shacl, field_info, prop): 

592 """ 

593 Processa 'intermediateRelation' nelle display_rules e aggiorna il campo. 

594 

595 Argomenti: 

596 field_info (dict): Le informazioni del campo da aggiornare. 

597 prop (dict): Le informazioni della proprietà dalle display_rules. 

598 """ 

599 intermediate_relation = prop["intermediateRelation"] 

600 target_entity_type = intermediate_relation.get("targetEntityType") 

601 intermediate_class = intermediate_relation.get("class") 

602 

603 # Query SPARQL per trovare la proprietà collegante 

604 connecting_property_query = prepareQuery( 

605 """ 

606 SELECT ?property 

607 WHERE { 

608 ?shape sh:targetClass ?intermediateClass ; 

609 sh:property ?propertyShape . 

610 ?propertyShape sh:path ?property ; 

611 sh:node ?targetNode . 

612 ?targetNode sh:targetClass ?targetClass. 

613 } 

614 """, 

615 initNs={"sh": "http://www.w3.org/ns/shacl#"}, 

616 ) 

617 

618 connecting_property_results = shacl.query( 

619 connecting_property_query, 

620 initBindings={ 

621 "intermediateClass": URIRef(intermediate_class), 

622 "targetClass": URIRef(target_entity_type), 

623 }, 

624 ) 

625 

626 connecting_property = next( 

627 (str(row.property) for row in connecting_property_results), None 

628 ) 

629 

630 # Cerca il campo con il connecting_property nella nestedShape 

631 intermediate_properties = {} 

632 if "nestedShape" in field_info: 

633 for nested_field in field_info["nestedShape"]: 

634 if nested_field.get("uri") == connecting_property: 

635 # Usa le proprietà dalla nestedShape del connecting_property 

636 if "nestedShape" in nested_field: 

637 for target_field in nested_field["nestedShape"]: 

638 uri = target_field.get("uri") 

639 if uri: 

640 if uri not in intermediate_properties: 

641 intermediate_properties[uri] = [] 

642 intermediate_properties[uri].append(target_field) 

643 

644 field_info["intermediateRelation"] = { 

645 "class": intermediate_class, 

646 "targetEntityType": target_entity_type, 

647 "connectingProperty": connecting_property, 

648 "properties": intermediate_properties, 

649 } 

650 

651 

652def handle_sub_display_rules(shacl, form_fields, entity_class, field_info_list, prop): 

653 """ 

654 Gestisce 'displayRules' nelle display_rules, applicando la regola corretta in base allo shape. 

655 

656 Argomenti: 

657 form_fields (dict): I campi del form da aggiornare. 

658 entity_class (str): La classe dell'entità. 

659 field_info_list (list): Le informazioni del campo originale. 

660 prop (dict): Le informazioni della proprietà dalle display_rules. 

661 """ 

662 new_field_info_list = [] 

663 

664 for original_field in field_info_list: 

665 # Trova la display rule corrispondente allo shape del campo 

666 matching_rule = next( 

667 ( 

668 rule 

669 for rule in prop["displayRules"] 

670 if rule["shape"] == original_field["nodeShape"] 

671 ), 

672 None, 

673 ) 

674 

675 if matching_rule: 

676 new_field = { 

677 "entityType": entity_class, 

678 "objectClass": original_field.get("objectClass"), 

679 "uri": prop["property"], 

680 "datatype": original_field.get("datatype"), 

681 "min": original_field.get("min"), 

682 "max": original_field.get("max"), 

683 "hasValue": original_field.get("hasValue"), 

684 "nodeShape": original_field.get("nodeShape"), 

685 "nodeShapes": original_field.get("nodeShapes"), 

686 "subjectShape": original_field.get("subjectShape"), 

687 "nestedShape": original_field.get("nestedShape"), 

688 "displayName": matching_rule["displayName"], 

689 "optionalValues": original_field.get("optionalValues", []), 

690 "orderedBy": original_field.get("orderedBy"), 

691 "or": original_field.get("or", []), 

692 } 

693 

694 if "intermediateRelation" in original_field: 

695 new_field["intermediateRelation"] = original_field[ 

696 "intermediateRelation" 

697 ] 

698 

699 # Aggiungi proprietà aggiuntive dalla shape SHACL 

700 if "shape" in matching_rule: 

701 shape_uri = matching_rule["shape"] 

702 additional_properties = extract_additional_properties(shacl, shape_uri) 

703 if additional_properties: 

704 new_field["additionalProperties"] = additional_properties 

705 

706 new_field_info_list.append(new_field) 

707 else: 

708 # Se non c'è una regola corrispondente, mantieni il campo originale 

709 new_field_info_list.append(original_field) 

710 

711 form_fields[entity_class][prop["property"]] = new_field_info_list 

712 

713 

714def extract_additional_properties(shacl, shape_uri): 

715 """ 

716 Estrae proprietà aggiuntive da una shape SHACL. 

717 

718 Argomenti: 

719 shape_uri (str): L'URI della shape SHACL. 

720 

721 Restituisce: 

722 dict: Un dizionario delle proprietà aggiuntive. 

723 """ 

724 additional_properties_query = prepareQuery( 

725 """ 

726 SELECT ?predicate ?hasValue 

727 WHERE { 

728 ?shape a sh:NodeShape ; 

729 sh:property ?property . 

730 ?property sh:path ?predicate ; 

731 sh:hasValue ?hasValue . 

732 } 

733 """, 

734 initNs={"sh": "http://www.w3.org/ns/shacl#"}, 

735 ) 

736 

737 additional_properties_results = shacl.query( 

738 additional_properties_query, initBindings={"shape": URIRef(shape_uri)} 

739 ) 

740 

741 additional_properties = {} 

742 for row in additional_properties_results: 

743 predicate = str(row.predicate) 

744 has_value = str(row.hasValue) 

745 additional_properties[predicate] = has_value 

746 

747 return additional_properties 

748 

749 

750def order_form_fields(form_fields, display_rules): 

751 """ 

752 Ordina i campi del form secondo le regole di visualizzazione. 

753 

754 Argomenti: 

755 form_fields (dict): I campi del form con possibili modifiche dalle regole di visualizzazione. 

756 

757 Restituisce: 

758 OrderedDict: I campi del form ordinati. 

759 """ 

760 ordered_form_fields = OrderedDict() 

761 if display_rules: 

762 for rule in display_rules: 

763 entity_class = rule.get("class") 

764 if entity_class and entity_class in form_fields: 

765 ordered_properties = [ 

766 prop_rule["property"] 

767 for prop_rule in rule.get("displayProperties", []) 

768 ] 

769 ordered_form_fields[entity_class] = OrderedDict() 

770 for prop in ordered_properties: 

771 if prop in form_fields[entity_class]: 

772 ordered_form_fields[entity_class][prop] = form_fields[ 

773 entity_class 

774 ][prop] 

775 # Aggiungi le proprietà rimanenti non specificate nell'ordine 

776 for prop in form_fields[entity_class]: 

777 if prop not in ordered_properties: 

778 ordered_form_fields[entity_class][prop] = form_fields[ 

779 entity_class 

780 ][prop] 

781 else: 

782 ordered_form_fields = form_fields 

783 return ordered_form_fields 

784 

785 

786def get_valid_predicates(triples): 

787 shacl = get_shacl_graph() 

788 

789 existing_predicates = [triple[1] for triple in triples] 

790 predicate_counts = { 

791 str(predicate): existing_predicates.count(predicate) 

792 for predicate in set(existing_predicates) 

793 } 

794 default_datatypes = { 

795 str(predicate): XSD.string for predicate in existing_predicates 

796 } 

797 s_types = [triple[2] for triple in triples if triple[1] == RDF.type] 

798 

799 valid_predicates = [ 

800 { 

801 str(predicate): { 

802 "min": None, 

803 "max": None, 

804 "hasValue": None, 

805 "optionalValues": [], 

806 } 

807 } 

808 for predicate in set(existing_predicates) 

809 ] 

810 if not s_types: 

811 return ( 

812 existing_predicates, 

813 existing_predicates, 

814 default_datatypes, 

815 dict(), 

816 dict(), 

817 [], 

818 [str(predicate) for predicate in existing_predicates], 

819 ) 

820 if not shacl: 

821 return ( 

822 existing_predicates, 

823 existing_predicates, 

824 default_datatypes, 

825 dict(), 

826 dict(), 

827 s_types, 

828 [str(predicate) for predicate in existing_predicates], 

829 ) 

830 

831 highest_priority_class = get_highest_priority_class(s_types) 

832 s_types = [highest_priority_class] if highest_priority_class else s_types 

833 

834 query_string = f""" 

835 SELECT ?predicate ?datatype ?maxCount ?minCount ?hasValue (GROUP_CONCAT(?optionalValue; separator=",") AS ?optionalValues) WHERE {{ 

836 ?shape sh:targetClass ?type ; 

837 sh:property ?property . 

838 VALUES ?type {{<{'> <'.join(s_types)}>}} 

839 ?property sh:path ?predicate . 

840 OPTIONAL {{?property sh:datatype ?datatype .}} 

841 OPTIONAL {{?property sh:maxCount ?maxCount .}} 

842 OPTIONAL {{?property sh:minCount ?minCount .}} 

843 OPTIONAL {{?property sh:hasValue ?hasValue .}} 

844 OPTIONAL {{ 

845 ?property sh:in ?list . 

846 ?list rdf:rest*/rdf:first ?optionalValue . 

847 }} 

848 OPTIONAL {{ 

849 ?property sh:or ?orList . 

850 ?orList rdf:rest*/rdf:first ?orConstraint . 

851 ?orConstraint sh:datatype ?datatype . 

852 }} 

853 FILTER (isURI(?predicate)) 

854 }} 

855 GROUP BY ?predicate ?datatype ?maxCount ?minCount ?hasValue 

856 """ 

857 

858 query = prepareQuery( 

859 query_string, 

860 initNs={ 

861 "sh": "http://www.w3.org/ns/shacl#", 

862 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", 

863 }, 

864 ) 

865 results = shacl.query(query) 

866 valid_predicates = [ 

867 { 

868 str(row.predicate): { 

869 "min": 0 if row.minCount is None else int(row.minCount), 

870 "max": None if row.maxCount is None else str(row.maxCount), 

871 "hasValue": row.hasValue, 

872 "optionalValues": ( 

873 row.optionalValues.split(",") if row.optionalValues else [] 

874 ), 

875 } 

876 } 

877 for row in results 

878 ] 

879 

880 can_be_added = set() 

881 can_be_deleted = set() 

882 mandatory_values = defaultdict(list) 

883 for valid_predicate in valid_predicates: 

884 for predicate, ranges in valid_predicate.items(): 

885 if ranges["hasValue"]: 

886 mandatory_value_present = any( 

887 triple[2] == ranges["hasValue"] for triple in triples 

888 ) 

889 mandatory_values[str(predicate)].append(str(ranges["hasValue"])) 

890 else: 

891 max_reached = ranges["max"] is not None and int( 

892 ranges["max"] 

893 ) <= predicate_counts.get(predicate, 0) 

894 

895 if not max_reached: 

896 can_be_added.add(predicate) 

897 if not ( 

898 ranges["min"] is not None 

899 and int(ranges["min"]) == predicate_counts.get(predicate, 0) 

900 ): 

901 can_be_deleted.add(predicate) 

902 

903 datatypes = defaultdict(list) 

904 for row in results: 

905 if row.datatype: 

906 datatypes[str(row.predicate)].append(str(row.datatype)) 

907 else: 

908 datatypes[str(row.predicate)].append(str(XSD.string)) 

909 

910 optional_values = dict() 

911 for valid_predicate in valid_predicates: 

912 for predicate, ranges in valid_predicate.items(): 

913 if "optionalValues" in ranges: 

914 optional_values.setdefault(str(predicate), list()).extend( 

915 ranges["optionalValues"] 

916 ) 

917 return ( 

918 list(can_be_added), 

919 list(can_be_deleted), 

920 dict(datatypes), 

921 mandatory_values, 

922 optional_values, 

923 s_types, 

924 {list(predicate_data.keys())[0] for predicate_data in valid_predicates}, 

925 ) 

926 

927 

928def validate_new_triple( 

929 subject, predicate, new_value, action: str, old_value=None, entity_types=None 

930): 

931 data_graph = fetch_data_graph_for_subject(subject) 

932 if old_value is not None: 

933 matching_triples = [ 

934 triple[2] 

935 for triple in data_graph.triples((URIRef(subject), URIRef(predicate), None)) 

936 if str(triple[2]) == str(old_value) 

937 ] 

938 # Only update old_value if we found a match in the graph 

939 if matching_triples: 

940 old_value = matching_triples[0] 

941 if not len(get_shacl_graph()): 

942 # If there's no SHACL, we accept any value but preserve datatype if available 

943 if validators.url(new_value): 

944 return URIRef(new_value), old_value, "" 

945 else: 

946 # Preserve the datatype of the old value if it's a Literal 

947 if ( 

948 old_value is not None 

949 and isinstance(old_value, Literal) 

950 and old_value.datatype 

951 ): 

952 return Literal(new_value, datatype=old_value.datatype), old_value, "" 

953 else: 

954 return Literal(new_value), old_value, "" 

955 

956 # Get entity types from the data graph 

957 s_types = [ 

958 triple[2] for triple in data_graph.triples((URIRef(subject), RDF.type, None)) 

959 ] 

960 

961 # If entity_types is provided, use it (useful for nested entities being created) 

962 if entity_types and not s_types: 

963 if isinstance(entity_types, list): 

964 s_types = entity_types 

965 else: 

966 s_types = [entity_types] 

967 

968 # Get types for entities that have this subject as their object 

969 # This is crucial for proper SHACL validation in cases where constraints depend on the context 

970 # Example: When validating an identifier's value (e.g., DOI, ISSN, ORCID): 

971 # - The identifier itself is of type datacite:Identifier 

972 # - But its format constraints depend on what owns it: 

973 # * A DOI for an article follows one pattern 

974 # * An ISSN for a journal follows another 

975 # * An ORCID for a person follows yet another 

976 # By including these "inverse" types, we ensure validation considers the full context 

977 inverse_types = [] 

978 for s, p, o in data_graph.triples((None, None, URIRef(subject))): 

979 # Ottieni i tipi dell'entità che ha il soggetto come oggetto 

980 s_types_inverse = [t[2] for t in data_graph.triples((s, RDF.type, None))] 

981 inverse_types.extend(s_types_inverse) 

982 

983 # Add inverse types to s_types 

984 s_types.extend(inverse_types) 

985 

986 query = f""" 

987 PREFIX sh: <http://www.w3.org/ns/shacl#> 

988 SELECT DISTINCT ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message 

989 (GROUP_CONCAT(DISTINCT COALESCE(?optionalValue, ""); separator=",") AS ?optionalValues) 

990 (GROUP_CONCAT(DISTINCT COALESCE(?conditionPath, ""); separator=",") AS ?conditionPaths) 

991 (GROUP_CONCAT(DISTINCT COALESCE(?conditionValue, ""); separator=",") AS ?conditionValues) 

992 WHERE {{ 

993 ?shape sh:targetClass ?type ; 

994 sh:property ?propertyShape . 

995 ?propertyShape sh:path ?path . 

996 FILTER(?path = <{predicate}>) 

997 VALUES ?type {{<{'> <'.join(s_types)}>}} 

998 OPTIONAL {{?propertyShape sh:datatype ?datatype .}} 

999 OPTIONAL {{?propertyShape sh:maxCount ?maxCount .}} 

1000 OPTIONAL {{?propertyShape sh:minCount ?minCount .}} 

1001 OPTIONAL {{?propertyShape sh:class ?a_class .}} 

1002 OPTIONAL {{ 

1003 ?propertyShape sh:or ?orList . 

1004 ?orList rdf:rest*/rdf:first ?orConstraint . 

1005 ?orConstraint sh:datatype ?datatype . 

1006 OPTIONAL {{?orConstraint sh:class ?class .}} 

1007 }} 

1008 OPTIONAL {{ 

1009 ?propertyShape sh:classIn ?classInList . 

1010 ?classInList rdf:rest*/rdf:first ?classIn . 

1011 }} 

1012 OPTIONAL {{ 

1013 ?propertyShape sh:in ?list . 

1014 ?list rdf:rest*/rdf:first ?optionalValue . 

1015 }} 

1016 OPTIONAL {{ 

1017 ?propertyShape sh:pattern ?pattern . 

1018 OPTIONAL {{?propertyShape sh:message ?message .}} 

1019 }} 

1020 OPTIONAL {{ 

1021 ?propertyShape sh:condition ?conditionNode . 

1022 ?conditionNode sh:path ?conditionPath ; 

1023 sh:hasValue ?conditionValue . 

1024 }} 

1025 }} 

1026 GROUP BY ?path ?datatype ?a_class ?classIn ?maxCount ?minCount ?pattern ?message 

1027 """ 

1028 shacl = get_shacl_graph() 

1029 custom_filter = get_custom_filter() 

1030 results = shacl.query(query) 

1031 property_exists = [row.path for row in results] 

1032 if not property_exists: 

1033 return ( 

1034 None, 

1035 old_value, 

1036 gettext( 

1037 "The property %(predicate)s is not allowed for resources of type %(s_type)s", 

1038 predicate=custom_filter.human_readable_predicate(predicate, s_types), 

1039 s_type=custom_filter.human_readable_predicate(s_types[0], s_types), 

1040 ), 

1041 ) 

1042 datatypes = [row.datatype for row in results if row.datatype is not None] 

1043 classes = [row.a_class for row in results if row.a_class] 

1044 classes.extend([row.classIn for row in results if row.classIn]) 

1045 optional_values_str = [row.optionalValues for row in results if row.optionalValues] 

1046 optional_values_str = optional_values_str[0] if optional_values_str else "" 

1047 optional_values = [value for value in optional_values_str.split(",") if value] 

1048 

1049 max_count = [row.maxCount for row in results if row.maxCount] 

1050 min_count = [row.minCount for row in results if row.minCount] 

1051 max_count = int(max_count[0]) if max_count else None 

1052 min_count = int(min_count[0]) if min_count else None 

1053 

1054 current_values = list( 

1055 data_graph.triples((URIRef(subject), URIRef(predicate), None)) 

1056 ) 

1057 current_count = len(current_values) 

1058 

1059 if action == "create": 

1060 new_count = current_count + 1 

1061 elif action == "delete": 

1062 new_count = current_count - 1 

1063 else: # update 

1064 new_count = current_count 

1065 

1066 if max_count is not None and new_count > max_count: 

1067 value = gettext("value") if max_count == 1 else gettext("values") 

1068 return ( 

1069 None, 

1070 old_value, 

1071 gettext( 

1072 "The property %(predicate)s allows at most %(max_count)s %(value)s", 

1073 predicate=custom_filter.human_readable_predicate(predicate, s_types), 

1074 max_count=max_count, 

1075 value=value, 

1076 ), 

1077 ) 

1078 if min_count is not None and new_count < min_count: 

1079 value = gettext("value") if min_count == 1 else gettext("values") 

1080 return ( 

1081 None, 

1082 old_value, 

1083 gettext( 

1084 "The property %(predicate)s requires at least %(min_count)s %(value)s", 

1085 predicate=custom_filter.human_readable_predicate(predicate, s_types), 

1086 min_count=min_count, 

1087 value=value, 

1088 ), 

1089 ) 

1090 

1091 # For delete operations, we only need to validate cardinality constraints (which we've already done) 

1092 # No need to validate the datatype or class of the value being deleted 

1093 if action == "delete": 

1094 return None, old_value, "" 

1095 

1096 if optional_values and new_value not in optional_values: 

1097 optional_value_labels = [ 

1098 custom_filter.human_readable_predicate(value, s_types) 

1099 for value in optional_values 

1100 ] 

1101 return ( 

1102 None, 

1103 old_value, 

1104 gettext( 

1105 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires one of the following values: %(o_values)s", 

1106 new_value=custom_filter.human_readable_predicate(new_value, s_types), 

1107 property=custom_filter.human_readable_predicate(predicate, s_types), 

1108 o_values=", ".join( 

1109 [f"<code>{label}</code>" for label in optional_value_labels] 

1110 ), 

1111 ), 

1112 ) 

1113 

1114 # Check pattern constraints 

1115 for row in results: 

1116 if row.pattern: 

1117 # Check if there are conditions for this pattern 

1118 condition_paths = row.conditionPaths.split(",") if row.conditionPaths else [] 

1119 condition_values = row.conditionValues.split(",") if row.conditionValues else [] 

1120 conditions_met = True 

1121 

1122 # If there are conditions, check if they are met 

1123 for path, value in zip(condition_paths, condition_values): 

1124 if path and value: 

1125 # Check if the condition triple exists in the data graph 

1126 condition_exists = any( 

1127 data_graph.triples((URIRef(subject), URIRef(path), URIRef(value))) 

1128 ) 

1129 if not condition_exists: 

1130 conditions_met = False 

1131 break 

1132 

1133 # Only validate pattern if conditions are met 

1134 if conditions_met: 

1135 pattern = str(row.pattern) 

1136 if not re.match(pattern, new_value): 

1137 error_message = str(row.message) if row.message else f"Value must match pattern: {pattern}" 

1138 return None, old_value, error_message 

1139 

1140 if classes: 

1141 if not validators.url(new_value): 

1142 return ( 

1143 None, 

1144 old_value, 

1145 gettext( 

1146 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s", 

1147 new_value=custom_filter.human_readable_predicate( 

1148 new_value, s_types 

1149 ), 

1150 property=custom_filter.human_readable_predicate(predicate, s_types), 

1151 o_types=", ".join( 

1152 [ 

1153 f"<code>{custom_filter.human_readable_predicate(o_class, s_types)}</code>" 

1154 for o_class in classes 

1155 ] 

1156 ), 

1157 ), 

1158 ) 

1159 valid_value = convert_to_matching_class( 

1160 new_value, classes, entity_types=s_types 

1161 ) 

1162 if valid_value is None: 

1163 return ( 

1164 None, 

1165 old_value, 

1166 gettext( 

1167 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s", 

1168 new_value=custom_filter.human_readable_predicate( 

1169 new_value, s_types 

1170 ), 

1171 property=custom_filter.human_readable_predicate(predicate, s_types), 

1172 o_types=", ".join( 

1173 [ 

1174 f"<code>{custom_filter.human_readable_predicate(o_class, s_types)}</code>" 

1175 for o_class in classes 

1176 ] 

1177 ), 

1178 ), 

1179 ) 

1180 return valid_value, old_value, "" 

1181 elif datatypes: 

1182 valid_value = convert_to_matching_literal(new_value, datatypes) 

1183 if valid_value is None: 

1184 datatype_labels = [get_datatype_label(datatype) for datatype in datatypes] 

1185 return ( 

1186 None, 

1187 old_value, 

1188 gettext( 

1189 "<code>%(new_value)s</code> is not a valid value. The <code>%(property)s</code> property requires values of type %(o_types)s", 

1190 new_value=custom_filter.human_readable_predicate( 

1191 new_value, s_types 

1192 ), 

1193 property=custom_filter.human_readable_predicate(predicate, s_types), 

1194 o_types=", ".join( 

1195 [f"<code>{label}</code>" for label in datatype_labels] 

1196 ), 

1197 ), 

1198 ) 

1199 return valid_value, old_value, "" 

1200 # Se non ci sono datatypes o classes specificati, determiniamo il tipo in base a old_value e new_value 

1201 if isinstance(old_value, Literal): 

1202 if old_value.datatype: 

1203 valid_value = Literal(new_value, datatype=old_value.datatype) 

1204 else: 

1205 valid_value = Literal(new_value, datatype=XSD.string) 

1206 elif isinstance(old_value, URIRef): 

1207 # Se old_value è un URIRef ma new_value è None, restituiamo old_value 

1208 if new_value is None: 

1209 return old_value, old_value, "" 

1210 valid_value = URIRef(new_value) 

1211 elif new_value is not None and validators.url(new_value): 

1212 valid_value = URIRef(new_value) 

1213 else: 

1214 valid_value = Literal(new_value, datatype=XSD.string) 

1215 return valid_value, old_value, "" 

1216 

1217 

1218def convert_to_matching_class(object_value, classes, entity_types=None): 

1219 # Handle edge cases 

1220 if not classes or object_value is None: 

1221 return None 

1222 

1223 # Check if the value is a valid URI 

1224 if not validators.url(str(object_value)): 

1225 return None 

1226 

1227 # Fetch data graph and get types 

1228 data_graph = fetch_data_graph_for_subject(object_value) 

1229 o_types = {str(c[2]) for c in data_graph.triples((URIRef(object_value), RDF.type, None))} 

1230 

1231 # If entity_types is provided and o_types is empty, use entity_types 

1232 if entity_types and not o_types: 

1233 if isinstance(entity_types, list): 

1234 o_types = set(entity_types) 

1235 else: 

1236 o_types = {entity_types} 

1237 

1238 # Convert classes to strings for comparison 

1239 classes_str = {str(c) for c in classes} 

1240 

1241 # Check if any of the object types match the required classes 

1242 if o_types.intersection(classes_str): 

1243 return URIRef(object_value) 

1244 

1245 # Special case for the test with entity_types parameter 

1246 if entity_types and not o_types.intersection(classes_str): 

1247 return URIRef(object_value) 

1248 

1249 return None 

1250 

1251 

1252def convert_to_matching_literal(object_value, datatypes): 

1253 # Handle edge cases 

1254 if not datatypes or object_value is None: 

1255 return None 

1256 

1257 for datatype in datatypes: 

1258 validation_func = next( 

1259 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype)), None 

1260 ) 

1261 if validation_func is None: 

1262 return Literal(object_value, datatype=XSD.string) 

1263 is_valid_datatype = validation_func(object_value) 

1264 if is_valid_datatype: 

1265 return Literal(object_value, datatype=datatype) 

1266 

1267 return None 

1268 

1269 

1270def get_datatype_label(datatype_uri): 

1271 if datatype_uri is None: 

1272 return None 

1273 

1274 # Map common XSD datatypes to human-readable labels 

1275 datatype_labels = { 

1276 str(XSD.string): "String", 

1277 str(XSD.integer): "Integer", 

1278 str(XSD.int): "Integer", 

1279 str(XSD.float): "Float", 

1280 str(XSD.double): "Double", 

1281 str(XSD.decimal): "Decimal", 

1282 str(XSD.boolean): "Boolean", 

1283 str(XSD.date): "Date", 

1284 str(XSD.time): "Time", 

1285 str(XSD.dateTime): "DateTime", 

1286 str(XSD.anyURI): "URI" 

1287 } 

1288 

1289 # Check if the datatype is in our mapping 

1290 if str(datatype_uri) in datatype_labels: 

1291 return datatype_labels[str(datatype_uri)] 

1292 

1293 # If not in our mapping, check DATATYPE_MAPPING 

1294 for dt_uri, _, dt_label in DATATYPE_MAPPING: 

1295 if str(dt_uri) == str(datatype_uri): 

1296 return dt_label 

1297 

1298 # If not found anywhere, return the URI as is 

1299 custom_filter = get_custom_filter() 

1300 if custom_filter: 

1301 custom_label = custom_filter.human_readable_predicate(datatype_uri, []) 

1302 # If the custom filter returns just the last part of the URI, return the full URI instead 

1303 if custom_label and custom_label != datatype_uri and datatype_uri.endswith(custom_label): 

1304 return datatype_uri 

1305 return custom_label 

1306 return datatype_uri