Coverage for heritrace / utils / shacl_display.py: 97%

316 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import json 

6from collections import OrderedDict, defaultdict 

7from collections.abc import Iterable 

8from dataclasses import dataclass 

9from pathlib import Path 

10from typing import cast 

11 

12from flask import Flask 

13from rdflib import Graph, URIRef 

14from rdflib.plugins.sparql import prepareQuery 

15from rdflib.plugins.sparql.sparql import Query 

16from rdflib.query import Result, ResultRow 

17 

18from heritrace.sparql import select_results 

19from heritrace.utils.filters import Filter 

20 

21 

22@dataclass(slots=True) 

23class ShaclProcessingContext: 

24 shacl: Graph 

25 display_rules: list[dict[str, object]] | None 

26 app: Flask 

27 processed_shapes: set[str] 

28 

29 

30@dataclass(slots=True) 

31class _ParsedRow: 

32 subject_shape: str 

33 entity_type: str 

34 predicate: str 

35 node_shape: str | None 

36 has_value: str | None 

37 object_class: str | None 

38 min_count: int 

39 max_count: int | None 

40 datatype: str | None 

41 optional_values: list[str] 

42 or_nodes: list[str] 

43 entity_key: tuple[str, str] 

44 condition_entry: dict[str, object] 

45 node_shapes: list[str] 

46 

47 

48COMMON_SPARQL_QUERY = prepareQuery( 

49 """ 

50 SELECT ?shape ?type ?predicate ?node_shape ?datatype 

51 ?max_count ?min_count ?has_value ?object_class 

52 ?condition_path ?condition_value ?pattern ?message 

53 (GROUP_CONCAT(?optional_value; separator=",") 

54 AS ?optional_values) 

55 (GROUP_CONCAT(?or_node; separator=",") AS ?or_nodes) 

56 WHERE { 

57 ?shape sh:targetClass ?type ; 

58 sh:property ?property . 

59 ?property sh:path ?predicate . 

60 OPTIONAL { 

61 ?property sh:node ?node_shape . 

62 OPTIONAL { 

63 ?node_shape sh:targetClass ?object_class . 

64 } 

65 } 

66 OPTIONAL { 

67 ?property sh:or ?orList . 

68 { 

69 ?orList rdf:rest*/rdf:first ?or_constraint . 

70 ?or_constraint sh:datatype ?datatype . 

71 } UNION { 

72 ?orList rdf:rest*/rdf:first ?or_node_shape . 

73 ?or_node_shape sh:node ?or_node . 

74 } UNION { 

75 ?orList rdf:rest*/rdf:first ?or_constraint . 

76 ?or_constraint sh:hasValue ?optional_value . 

77 } 

78 } 

79 OPTIONAL { ?property sh:datatype ?datatype . } 

80 OPTIONAL { ?property sh:maxCount ?max_count . } 

81 OPTIONAL { ?property sh:minCount ?min_count . } 

82 OPTIONAL { ?property sh:hasValue ?has_value . } 

83 OPTIONAL { 

84 ?property sh:in ?list . 

85 ?list rdf:rest*/rdf:first ?optional_value . 

86 } 

87 OPTIONAL { 

88 ?property sh:condition ?condition_node . 

89 ?condition_node sh:path ?condition_path ; 

90 sh:hasValue ?condition_value . 

91 } 

92 OPTIONAL { ?property sh:pattern ?pattern . } 

93 OPTIONAL { ?property sh:message ?message . } 

94 FILTER (isURI(?predicate)) 

95 } 

96 GROUP BY ?shape ?type ?predicate ?node_shape ?datatype 

97 ?max_count ?min_count ?has_value ?object_class 

98 ?condition_path ?condition_value ?pattern ?message 

99""", 

100 initNs={ 

101 "sh": "http://www.w3.org/ns/shacl#", 

102 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", 

103 }, 

104) 

105 

106 

107def _parse_row(row: ResultRow) -> _ParsedRow: 

108 subject_shape = str(row.shape) 

109 entity_type = str(row.type) 

110 predicate = str(row.predicate) 

111 node_shape = str(row.node_shape) if row.node_shape else None 

112 has_value = str(row.has_value) if row.has_value else None 

113 object_class = str(row.object_class) if row.object_class else None 

114 min_count = 0 if row.min_count is None else int(row.min_count) 

115 max_count = None if row.max_count is None else int(row.max_count) 

116 datatype = str(row.datatype) if row.datatype else None 

117 optional_values = [v for v in (row.optional_values or "").split(",") if v] 

118 or_nodes = [v for v in (row.or_nodes or "").split(",") if v] 

119 

120 condition_entry: dict[str, object] = {} 

121 if row.condition_path and row.condition_value: 

122 condition_entry["condition"] = { 

123 "path": str(row.condition_path), 

124 "value": str(row.condition_value), 

125 } 

126 if row.pattern: 

127 condition_entry["pattern"] = str(row.pattern) 

128 if row.message: 

129 condition_entry["message"] = str(row.message) 

130 

131 node_shapes = [] 

132 if node_shape: 

133 node_shapes.append(node_shape) 

134 node_shapes.extend(or_nodes) 

135 

136 return _ParsedRow( 

137 subject_shape=subject_shape, 

138 entity_type=entity_type, 

139 predicate=predicate, 

140 node_shape=node_shape, 

141 has_value=has_value, 

142 object_class=object_class, 

143 min_count=min_count, 

144 max_count=max_count, 

145 datatype=datatype, 

146 optional_values=optional_values, 

147 or_nodes=or_nodes, 

148 entity_key=(entity_type, subject_shape), 

149 condition_entry=condition_entry, 

150 node_shapes=node_shapes, 

151 ) 

152 

153 

154def _find_existing_field( 

155 fields: list[dict[str, object]], 

156 parsed: _ParsedRow, 

157) -> dict[str, object] | None: 

158 for field in fields: 

159 if ( 

160 field.get("nodeShape") == parsed.node_shape 

161 and field.get("nodeShapes") == parsed.node_shapes 

162 and field.get("subjectShape") == parsed.subject_shape 

163 and field.get("hasValue") == parsed.has_value 

164 and field.get("objectClass") == parsed.object_class 

165 and field.get("min") == parsed.min_count 

166 and field.get("max") == parsed.max_count 

167 and field.get("optionalValues") == parsed.optional_values 

168 ): 

169 return field 

170 return None 

171 

172 

173def _process_or_nodes( 

174 ctx: ShaclProcessingContext, 

175 parsed: _ParsedRow, 

176 custom_filter: Filter, 

177 field_info: dict[str, object], 

178 depth: int, 

179) -> None: 

180 field_info["or"] = [] 

181 for node in parsed.or_nodes: 

182 entity_type_or_node = get_shape_target_class(ctx.shacl, node) 

183 object_class = get_object_class(ctx.shacl, node, parsed.predicate) 

184 shape_display_name = custom_filter.human_readable_class( 

185 (entity_type_or_node, node) 

186 ) 

187 or_field_info: dict[str, object] = { 

188 "entityType": entity_type_or_node, 

189 "uri": parsed.predicate, 

190 "displayName": shape_display_name, 

191 "subjectShape": parsed.subject_shape, 

192 "nodeShape": node, 

193 "min": parsed.min_count, 

194 "max": parsed.max_count, 

195 "hasValue": parsed.has_value, 

196 "objectClass": object_class, 

197 "optionalValues": parsed.optional_values, 

198 "conditions": ([parsed.condition_entry] if parsed.condition_entry else []), 

199 "shouldBeDisplayed": True, 

200 } 

201 if node not in ctx.processed_shapes: 

202 or_field_info["nestedShape"] = process_nested_shapes( 

203 ctx, 

204 node, 

205 depth=depth + 1, 

206 ) 

207 field_info["or"].append(or_field_info) 

208 

209 

210def _process_single_row( 

211 ctx: ShaclProcessingContext, 

212 parsed: _ParsedRow, 

213 custom_filter: Filter, 

214 form_fields: defaultdict[tuple[str, str], dict[str, list[dict[str, object]]]], 

215 depth: int, 

216) -> None: 

217 if parsed.predicate not in form_fields[parsed.entity_key]: 

218 form_fields[parsed.entity_key][parsed.predicate] = [] 

219 

220 existing_field = _find_existing_field( 

221 form_fields[parsed.entity_key][parsed.predicate], parsed 

222 ) 

223 

224 if existing_field: 

225 if parsed.datatype and str(parsed.datatype) not in cast( 

226 "list", existing_field.get("datatypes", []) 

227 ): 

228 cast("list", existing_field.setdefault("datatypes", [])).append( 

229 str(parsed.datatype) 

230 ) 

231 if parsed.condition_entry: 

232 cast("list", existing_field.setdefault("conditions", [])).append( 

233 parsed.condition_entry 

234 ) 

235 return 

236 

237 field_info: dict[str, object] = { 

238 "entityType": parsed.entity_type, 

239 "uri": parsed.predicate, 

240 "nodeShape": parsed.node_shape, 

241 "nodeShapes": parsed.node_shapes, 

242 "subjectShape": parsed.subject_shape, 

243 "entityKey": parsed.entity_key, 

244 "datatypes": [parsed.datatype] if parsed.datatype else [], 

245 "min": parsed.min_count, 

246 "max": parsed.max_count, 

247 "hasValue": parsed.has_value, 

248 "objectClass": parsed.object_class, 

249 "optionalValues": parsed.optional_values, 

250 "conditions": ([parsed.condition_entry] if parsed.condition_entry else []), 

251 "inputType": determine_input_type(parsed.datatype), 

252 "shouldBeDisplayed": True, 

253 } 

254 

255 if parsed.node_shape and parsed.node_shape not in ctx.processed_shapes: 

256 field_info["nestedShape"] = process_nested_shapes( 

257 ctx, 

258 parsed.node_shape, 

259 depth=depth + 1, 

260 ) 

261 

262 if parsed.or_nodes: 

263 _process_or_nodes(ctx, parsed, custom_filter, field_info, depth) 

264 

265 form_fields[parsed.entity_key][parsed.predicate].append(field_info) 

266 

267 

268def process_query_results( 

269 ctx: ShaclProcessingContext, 

270 results: Iterable[ResultRow], 

271 depth: int = 0, 

272) -> defaultdict[tuple[str, str], dict[str, list[dict[str, object]]]]: 

273 form_fields: defaultdict[tuple[str, str], dict[str, list[dict[str, object]]]] = ( 

274 defaultdict(dict) 

275 ) 

276 

277 with (Path(__file__).parent / "context.json").open() as config_file: 

278 context = json.load(config_file)["@context"] 

279 

280 custom_filter = Filter(context, ctx.display_rules, ctx.app.config["DATASET_DB_URL"]) 

281 

282 for row in results: 

283 parsed = _parse_row(row) 

284 _process_single_row(ctx, parsed, custom_filter, form_fields, depth) 

285 

286 return form_fields 

287 

288 

289def process_nested_shapes( 

290 ctx: ShaclProcessingContext, 

291 shape_uri: str, 

292 depth: int = 0, 

293) -> list[dict[str, object]]: 

294 if shape_uri in ctx.processed_shapes: 

295 return [] 

296 

297 ctx.processed_shapes.add(shape_uri) 

298 init_bindings = {"shape": URIRef(shape_uri)} 

299 nested_results = execute_shacl_query(ctx.shacl, COMMON_SPARQL_QUERY, init_bindings) 

300 nested_fields = [] 

301 

302 temp_form_fields = process_query_results( 

303 ctx, 

304 select_results(nested_results), 

305 depth=depth, 

306 ) 

307 

308 if ctx.display_rules: 

309 temp_form_fields = apply_display_rules( 

310 ctx.shacl, temp_form_fields, ctx.display_rules 

311 ) 

312 temp_form_fields = order_form_fields(temp_form_fields, ctx.display_rules) 

313 

314 for entity_type in temp_form_fields: 

315 for predicate in temp_form_fields[entity_type]: 

316 nested_fields.extend(temp_form_fields[entity_type][predicate]) 

317 

318 ctx.processed_shapes.remove(shape_uri) 

319 return nested_fields 

320 

321 

322def get_property_order( 

323 entity_type: str, 

324 display_rules: list[dict[str, object]] | None, 

325) -> list[str | None]: 

326 """ 

327 Recupera l'ordine delle proprietà per un tipo di entità dalle regole di 

328 visualizzazione. 

329 

330 Argomenti: 

331 entity_type (str): L'URI del tipo di entità. 

332 

333 Restituisce: 

334 list: Una lista di URI di proprietà nell'ordine desiderato. 

335 """ 

336 if not display_rules: 

337 return [] 

338 

339 for rule in display_rules: 

340 if rule.get("class") == entity_type and "propertyOrder" in rule: 

341 return cast("list[str | None]", rule["propertyOrder"]) 

342 if rule.get("class") == entity_type: 

343 display_props = cast( 

344 "list[dict[str, object]]", rule.get("displayProperties", []) 

345 ) 

346 return [ 

347 cast("str | None", prop.get("property") or prop.get("virtual_property")) 

348 for prop in display_props 

349 if prop.get("property") or prop.get("virtual_property") 

350 ] 

351 return [] 

352 

353 

354def order_fields( 

355 fields: list[dict[str, object]], 

356 property_order: list[str], 

357) -> list[dict[str, object]]: 

358 """ 

359 Ordina i campi secondo l'ordine specificato delle proprietà. 

360 

361 Argomenti: 

362 fields (list): Una lista di dizionari dei campi da ordinare. 

363 property_order (list): Una lista di URI di proprietà nell'ordine desiderato. 

364 

365 Restituisce: 

366 list: Una lista ordinata di dizionari dei campi. 

367 """ 

368 if not fields: 

369 return [] 

370 if not property_order: 

371 return fields 

372 

373 # Create a dictionary to map predicates to their position in property_order 

374 order_dict = {pred: i for i, pred in enumerate(property_order)} 

375 

376 # Sort fields based on their position in property_order 

377 # Fields not in property_order will be placed at the end 

378 return sorted( 

379 fields, 

380 key=lambda f: order_dict.get( 

381 str(f.get("predicate", f.get("uri", ""))), float("inf") 

382 ), 

383 ) 

384 

385 

386def _find_matching_entity_keys( 

387 form_fields: dict[ 

388 tuple[str, str], 

389 dict[str, list[dict[str, object]]], 

390 ], 

391 entity_class: str | None, 

392 entity_shape: str | None, 

393) -> list[tuple[str, str]]: 

394 if entity_class and entity_shape: 

395 entity_key = (entity_class, entity_shape) 

396 return [entity_key] if entity_key in form_fields else [] 

397 if entity_class: 

398 return [key for key in form_fields if key[0] == entity_class] 

399 if entity_shape: 

400 return [key for key in form_fields if key[1] == entity_shape] 

401 return [] 

402 

403 

404def _get_ordered_properties_from_rule( 

405 rule: dict[str, object], 

406) -> list[str | None]: 

407 display_props = cast("list[dict[str, object]]", rule.get("displayProperties", [])) 

408 return [ 

409 cast( 

410 "str | None", 

411 prop_rule.get("property") or prop_rule.get("virtual_property"), 

412 ) 

413 for prop_rule in display_props 

414 if prop_rule.get("property") or prop_rule.get("virtual_property") 

415 ] 

416 

417 

418def _order_entity_fields( 

419 form_fields: dict[ 

420 tuple[str, str], 

421 dict[str, list[dict[str, object]]], 

422 ], 

423 entity_key: tuple[str, str], 

424 ordered_properties: list[str | None], 

425 ordered_form_fields: OrderedDict[ 

426 tuple[str, str], 

427 OrderedDict[str, list[dict[str, object]]], 

428 ], 

429) -> None: 

430 ordered_form_fields[entity_key] = OrderedDict() 

431 for prop in ordered_properties: 

432 if prop in form_fields[entity_key]: 

433 ordered_form_fields[entity_key][prop] = form_fields[entity_key][prop] 

434 # Aggiungi le proprietà rimanenti non specificate nell'ordine 

435 for prop in form_fields[entity_key]: 

436 if prop not in ordered_properties: 

437 ordered_form_fields[entity_key][prop] = form_fields[entity_key][prop] 

438 

439 

440def order_form_fields( 

441 form_fields: dict[ 

442 tuple[str, str], 

443 dict[str, list[dict[str, object]]], 

444 ], 

445 display_rules: list[dict[str, object]] | None, 

446) -> ( 

447 OrderedDict[ 

448 tuple[str, str], 

449 OrderedDict[str, list[dict[str, object]]], 

450 ] 

451 | dict[ 

452 tuple[str, str], 

453 dict[str, list[dict[str, object]]], 

454 ] 

455): 

456 """ 

457 Ordina i campi del form secondo le regole di visualizzazione. 

458 

459 Argomenti: 

460 form_fields (dict): I campi del form con possibili modifiche dalle regole di 

461 visualizzazione. 

462 

463 Restituisce: 

464 OrderedDict: I campi del form ordinati. 

465 """ 

466 ordered_form_fields = OrderedDict() 

467 if not display_rules: 

468 return form_fields 

469 for rule in display_rules: 

470 target = cast("dict[str, str]", rule.get("target", {})) 

471 entity_class = target.get("class") 

472 entity_shape = target.get("shape") 

473 ordered_properties = _get_ordered_properties_from_rule(rule) 

474 matching_keys = _find_matching_entity_keys( 

475 form_fields, entity_class, entity_shape 

476 ) 

477 for key in matching_keys: 

478 _order_entity_fields( 

479 form_fields, key, ordered_properties, ordered_form_fields 

480 ) 

481 return ordered_form_fields 

482 

483 

484def apply_display_rules( 

485 shacl: Graph, 

486 form_fields: dict[ 

487 tuple[str, str], 

488 dict[str, list[dict[str, object]]], 

489 ], 

490 display_rules: list[dict[str, object]], 

491) -> dict[ 

492 tuple[str, str], 

493 dict[str, list[dict[str, object]]], 

494]: 

495 """ 

496 Applica le regole di visualizzazione ai campi del form. 

497 

498 Argomenti: 

499 form_fields (dict): I campi del form iniziali estratti dalle shape SHACL. 

500 

501 Restituisce: 

502 dict: I campi del form dopo aver applicato le regole di visualizzazione. 

503 """ 

504 for rule in display_rules: 

505 target = cast("dict[str, str]", rule.get("target", {})) 

506 entity_class = target.get("class") 

507 entity_shape = target.get("shape") 

508 

509 # Handle different cases based on available target information 

510 # Case 1: Both class and shape are specified (exact match) 

511 if entity_class and entity_shape: 

512 entity_key = (entity_class, entity_shape) 

513 if entity_key in form_fields: 

514 apply_rule_to_entity(shacl, form_fields, entity_key, rule) 

515 # Case 2: Only class is specified (apply to all matching classes) 

516 elif entity_class: 

517 for key in list(form_fields.keys()): 

518 if key[0] == entity_class: # Check if class part of tuple matches 

519 apply_rule_to_entity(shacl, form_fields, key, rule) 

520 # Case 3: Only shape is specified (apply to all matching shapes) 

521 elif entity_shape: 

522 for key in list(form_fields.keys()): 

523 if key[1] == entity_shape: # Check if shape part of tuple matches 

524 apply_rule_to_entity(shacl, form_fields, key, rule) 

525 return form_fields 

526 

527 

528def apply_rule_to_entity( 

529 shacl: Graph, 

530 form_fields: dict[ 

531 tuple[str, str], 

532 dict[str, list[dict[str, object]]], 

533 ], 

534 entity_key: tuple[str, str], 

535 rule: dict[str, object], 

536) -> None: 

537 """ 

538 Apply a display rule to a specific entity key. 

539 

540 Args: 

541 shacl: The SHACL graph 

542 form_fields: The form fields dictionary 

543 entity_key: The entity key tuple (class, shape) 

544 rule: The display rule to apply 

545 """ 

546 display_props = cast("list[dict[str, object]]", rule.get("displayProperties", [])) 

547 for prop in display_props: 

548 prop_uri = prop.get("property") or prop.get("virtual_property") 

549 if prop_uri and prop_uri in form_fields[entity_key]: 

550 for field_info in form_fields[entity_key][str(prop_uri)]: 

551 add_display_information(field_info, prop) 

552 if "nestedShape" in field_info: 

553 target = cast("dict[str, str]", rule.get("target", {})) 

554 apply_display_rules_to_nested_shapes( 

555 cast("list[dict[str, object]]", field_info["nestedShape"]), 

556 prop, 

557 target.get("shape"), 

558 ) 

559 if "or" in field_info: 

560 target = cast("dict[str, str]", rule.get("target", {})) 

561 for or_field in cast("list[dict[str, object]]", field_info["or"]): 

562 apply_display_rules_to_nested_shapes( 

563 [or_field], field_info, target.get("shape") 

564 ) 

565 if "intermediateRelation" in prop: 

566 handle_intermediate_relation(shacl, field_info, prop) 

567 if "displayRules" in prop: 

568 handle_sub_display_rules( 

569 shacl, 

570 form_fields, 

571 entity_key, 

572 form_fields[entity_key][str(prop_uri)], 

573 prop, 

574 ) 

575 

576 

577def apply_display_rules_to_nested_shapes( # noqa: C901 

578 nested_fields: list[dict[str, object]], 

579 parent_prop: dict[str, object], 

580 shape_uri: str | None, 

581) -> list[dict[str, object]]: 

582 """Apply display rules to nested shapes.""" 

583 if not nested_fields: 

584 return [] 

585 

586 # Handle case where parent_prop is not a dictionary 

587 if not isinstance(parent_prop, dict): 

588 return nested_fields 

589 

590 # Create a new list to avoid modifying the original 

591 result_fields = [] 

592 for field in nested_fields: 

593 # Create a copy of the field to avoid modifying the original 

594 new_field = field.copy() 

595 result_fields.append(new_field) 

596 

597 display_rules = cast("list[dict[str, object]]", parent_prop.get("displayRules", [])) 

598 for rule in display_rules: 

599 if rule.get("shape") == shape_uri and "nestedDisplayRules" in rule: 

600 nested_display_rules = cast( 

601 "list[dict[str, object]]", rule["nestedDisplayRules"] 

602 ) 

603 for field in result_fields: 

604 for nested_rule in nested_display_rules: 

605 field_key = field.get("predicate", field.get("uri")) 

606 if field_key == nested_rule["property"]: 

607 # Apply display properties from the rule to the field 

608 for key, value in nested_rule.items(): 

609 if key != "property": 

610 field[key] = value 

611 break 

612 

613 return result_fields 

614 

615 

616def determine_input_type(datatype: str | None) -> str: 

617 """ 

618 Determina il tipo di input appropriato basato sul datatype XSD. 

619 """ 

620 if not datatype: 

621 return "text" 

622 

623 datatype = str(datatype) 

624 datatype_to_input = { 

625 "http://www.w3.org/2001/XMLSchema#string": "text", 

626 "http://www.w3.org/2001/XMLSchema#integer": "number", 

627 "http://www.w3.org/2001/XMLSchema#decimal": "number", 

628 "http://www.w3.org/2001/XMLSchema#float": "number", 

629 "http://www.w3.org/2001/XMLSchema#double": "number", 

630 "http://www.w3.org/2001/XMLSchema#boolean": "checkbox", 

631 "http://www.w3.org/2001/XMLSchema#date": "date", 

632 "http://www.w3.org/2001/XMLSchema#time": "time", 

633 "http://www.w3.org/2001/XMLSchema#dateTime": "datetime-local", 

634 "http://www.w3.org/2001/XMLSchema#anyURI": "url", 

635 "http://www.w3.org/2001/XMLSchema#email": "email", 

636 } 

637 return datatype_to_input.get(datatype, "text") 

638 

639 

640def add_display_information( 

641 field_info: dict[str, object], 

642 prop: dict[str, object], 

643) -> None: 

644 """ 

645 Aggiunge informazioni di visualizzazione dal display_rules ad un campo. 

646 

647 Argomenti: 

648 field_info (dict): Le informazioni del campo da aggiornare. 

649 prop (dict): Le informazioni della proprietà dalle display_rules. 

650 """ 

651 if "displayName" in prop: 

652 field_info["displayName"] = prop["displayName"] 

653 if "shouldBeDisplayed" in prop: 

654 field_info["shouldBeDisplayed"] = prop.get("shouldBeDisplayed", True) 

655 if "orderedBy" in prop: 

656 field_info["orderedBy"] = prop["orderedBy"] 

657 if "inputType" in prop: 

658 field_info["inputType"] = prop["inputType"] 

659 if "supportsSearch" in prop: 

660 field_info["supportsSearch"] = prop["supportsSearch"] 

661 if "minCharsForSearch" in prop: 

662 field_info["minCharsForSearch"] = prop["minCharsForSearch"] 

663 if "searchTarget" in prop: 

664 field_info["searchTarget"] = prop["searchTarget"] 

665 

666 

667def handle_intermediate_relation( 

668 shacl: Graph, 

669 field_info: dict[str, object], 

670 prop: dict[str, object], 

671) -> None: 

672 """ 

673 Processa 'intermediateRelation' nelle display_rules e aggiorna il campo. 

674 

675 Argomenti: 

676 field_info (dict): Le informazioni del campo da aggiornare. 

677 prop (dict): Le informazioni della proprietà dalle display_rules. 

678 """ 

679 intermediate_relation = cast("dict[str, str]", prop["intermediateRelation"]) 

680 target_entity_type = intermediate_relation["targetEntityType"] 

681 intermediate_class = intermediate_relation["class"] 

682 

683 connecting_property_query = prepareQuery( 

684 """ 

685 SELECT ?property 

686 WHERE { 

687 ?shape sh:targetClass ?intermediateClass ; 

688 sh:property ?propertyShape . 

689 ?propertyShape sh:path ?property ; 

690 sh:node ?targetNode . 

691 ?targetNode sh:targetClass ?targetClass. 

692 } 

693 """, 

694 initNs={"sh": "http://www.w3.org/ns/shacl#"}, 

695 ) 

696 

697 connecting_property_results = shacl.query( 

698 connecting_property_query, 

699 initBindings={ 

700 "intermediateClass": URIRef(intermediate_class), 

701 "targetClass": URIRef(target_entity_type), 

702 }, 

703 ) 

704 

705 connecting_property = next( 

706 (str(row.property) for row in select_results(connecting_property_results)), None 

707 ) 

708 

709 intermediate_properties = {} 

710 target_shape = None 

711 if "nestedShape" in field_info: 

712 for nested_field in cast("list[dict[str, object]]", field_info["nestedShape"]): 

713 if ( 

714 nested_field.get("uri") == connecting_property 

715 and "nestedShape" in nested_field 

716 ) and "nestedShape" in nested_field: 

717 for target_field in cast( 

718 "list[dict[str, object]]", nested_field["nestedShape"] 

719 ): 

720 uri = target_field.get("uri") 

721 if uri: 

722 if uri not in intermediate_properties: 

723 intermediate_properties[uri] = [] 

724 intermediate_properties[uri].append(target_field) 

725 if target_field.get("subjectShape"): 

726 target_shape = target_field["subjectShape"] 

727 

728 field_info["intermediateRelation"] = { 

729 "class": intermediate_class, 

730 "targetEntityType": target_entity_type, 

731 "targetShape": target_shape, 

732 "connectingProperty": connecting_property, 

733 "properties": intermediate_properties, 

734 } 

735 

736 

737def handle_sub_display_rules( 

738 shacl: Graph, 

739 form_fields: dict[ 

740 tuple[str, str], 

741 dict[str, list[dict[str, object]]], 

742 ], 

743 entity_key: tuple[str, str], 

744 field_info_list: list[dict[str, object]], 

745 prop: dict[str, object], 

746) -> None: 

747 """ 

748 Gestisce 'displayRules' nelle display_rules, applicando la regola corretta in base 

749 allo shape. 

750 

751 Argomenti: 

752 form_fields (dict): I campi del form da aggiornare. 

753 entity_key (tuple): La chiave dell'entità (class, shape). 

754 field_info_list (list): Le informazioni del campo originale. 

755 prop (dict): Le informazioni della proprietà dalle display_rules. 

756 """ 

757 new_field_info_list = [] 

758 entity_class = entity_key[0] if isinstance(entity_key, tuple) else entity_key 

759 

760 for original_field in field_info_list: 

761 # Trova la display rule corrispondente allo shape del campo 

762 sub_display_rules = cast("list[dict[str, object]]", prop["displayRules"]) 

763 matching_rule = next( 

764 ( 

765 rule 

766 for rule in sub_display_rules 

767 if rule["shape"] == original_field["nodeShape"] 

768 ), 

769 None, 

770 ) 

771 

772 if matching_rule: 

773 new_field = { 

774 "entityType": entity_class, 

775 "entityKey": entity_key, # Store the tuple key 

776 "objectClass": original_field.get("objectClass"), 

777 "uri": prop["property"], 

778 "datatype": original_field.get("datatype"), 

779 "min": original_field.get("min"), 

780 "max": original_field.get("max"), 

781 "hasValue": original_field.get("hasValue"), 

782 "nodeShape": original_field.get("nodeShape"), 

783 "nodeShapes": original_field.get("nodeShapes"), 

784 "subjectShape": original_field.get("subjectShape"), 

785 "nestedShape": original_field.get("nestedShape"), 

786 "displayName": matching_rule["displayName"], 

787 "optionalValues": original_field.get("optionalValues", []), 

788 "orderedBy": original_field.get("orderedBy"), 

789 "or": original_field.get("or", []), 

790 } 

791 

792 if "intermediateRelation" in original_field: 

793 new_field["intermediateRelation"] = original_field[ 

794 "intermediateRelation" 

795 ] 

796 

797 # Aggiungi proprietà aggiuntive dalla shape SHACL 

798 if "shape" in matching_rule: 

799 shape_uri = str(matching_rule["shape"]) 

800 additional_properties = extract_additional_properties(shacl, shape_uri) 

801 if additional_properties: 

802 new_field["additionalProperties"] = additional_properties 

803 

804 new_field_info_list.append(new_field) 

805 else: 

806 # Se non c'è una regola corrispondente, mantieni il campo originale 

807 new_field_info_list.append(original_field) 

808 

809 form_fields[entity_key][str(prop["property"])] = new_field_info_list 

810 

811 

812def get_shape_target_class(shacl: Graph, shape_uri: str) -> str | None: 

813 query = prepareQuery( 

814 """ 

815 SELECT ?targetClass 

816 WHERE { 

817 ?shape sh:targetClass ?targetClass . 

818 } 

819 """, 

820 initNs={"sh": "http://www.w3.org/ns/shacl#"}, 

821 ) 

822 results = execute_shacl_query(shacl, query, {"shape": URIRef(shape_uri)}) 

823 for row in select_results(results): 

824 return str(row.targetClass) 

825 return None 

826 

827 

828def get_object_class(shacl: Graph, shape_uri: str, predicate_uri: str) -> str | None: 

829 query = prepareQuery( 

830 """ 

831 SELECT DISTINCT ?targetClass 

832 WHERE { 

833 ?shape sh:property ?propertyShape . 

834 ?propertyShape sh:path ?predicate . 

835 { 

836 # Caso 1: definizione diretta con sh:node 

837 ?propertyShape sh:node ?nodeShape . 

838 ?nodeShape sh:targetClass ?targetClass . 

839 } UNION { 

840 # Caso 2: definizione diretta con sh:class 

841 ?propertyShape sh:class ?targetClass . 

842 } UNION { 

843 # Caso 3: definizione con sh:or che include node shapes 

844 ?propertyShape sh:or ?orList . 

845 ?orList rdf:rest*/rdf:first ?choice . 

846 { 

847 ?choice sh:node ?nodeShape . 

848 ?nodeShape sh:targetClass ?targetClass . 

849 } UNION { 

850 ?choice sh:class ?targetClass . 

851 } 

852 } 

853 } 

854 """, 

855 initNs={ 

856 "sh": "http://www.w3.org/ns/shacl#", 

857 "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", 

858 }, 

859 ) 

860 

861 results = execute_shacl_query( 

862 shacl, query, {"shape": URIRef(shape_uri), "predicate": URIRef(predicate_uri)} 

863 ) 

864 

865 # Prendiamo il primo risultato valido 

866 for row in select_results(results): 

867 if row.targetClass: 

868 return str(row.targetClass) 

869 return None 

870 

871 

872def extract_shacl_form_fields( 

873 shacl: Graph | None, 

874 display_rules: list[dict[str, object]] | None, 

875 app: Flask, 

876) -> ( 

877 dict[ 

878 tuple[str, str], 

879 dict[str, list[dict[str, object]]], 

880 ] 

881 | defaultdict[ 

882 tuple[str, str], 

883 dict[str, list[dict[str, object]]], 

884 ] 

885): 

886 """ 

887 Estrae i campi del form dalle shape SHACL. 

888 

889 Args: 

890 shacl: The SHACL graph 

891 display_rules: The display rules configuration 

892 app: Flask application instance 

893 

894 Returns: 

895 defaultdict: A dictionary where the keys are tuples (class, shape) and the 

896 values are dictionaries 

897 of form fields with their properties. 

898 """ 

899 if not shacl: 

900 return {} 

901 

902 ctx = ShaclProcessingContext( 

903 shacl=shacl, 

904 display_rules=display_rules, 

905 app=app, 

906 processed_shapes=set(), 

907 ) 

908 results = execute_shacl_query(shacl, COMMON_SPARQL_QUERY) 

909 return process_query_results( 

910 ctx, 

911 select_results(results), 

912 depth=0, 

913 ) 

914 

915 

916def execute_shacl_query( 

917 shacl: Graph, 

918 query: Query, 

919 init_bindings: dict[str, URIRef] | None = None, 

920) -> Result: 

921 """ 

922 Esegue una query SPARQL sul grafo SHACL con eventuali binding iniziali. 

923 

924 Args: 

925 shacl (Graph): The SHACL graph on which to execute the query. 

926 query (PreparedQuery): The prepared SPARQL query. 

927 init_bindings (dict): Initial bindings for the query. 

928 

929 Returns: 

930 Result: The query results. 

931 """ 

932 if init_bindings: 

933 return shacl.query(query, initBindings=init_bindings) 

934 return shacl.query(query) 

935 

936 

937def extract_additional_properties(shacl: Graph, shape_uri: str) -> dict[str, str]: 

938 """ 

939 Estrae proprietà aggiuntive da una shape SHACL. 

940 

941 Argomenti: 

942 shape_uri (str): L'URI della shape SHACL. 

943 

944 Restituisce: 

945 dict: Un dizionario delle proprietà aggiuntive. 

946 """ 

947 additional_properties_query = prepareQuery( 

948 """ 

949 SELECT ?predicate ?has_value 

950 WHERE { 

951 ?shape a sh:NodeShape ; 

952 sh:property ?property . 

953 ?property sh:path ?predicate ; 

954 sh:hasValue ?has_value . 

955 } 

956 """, 

957 initNs={"sh": "http://www.w3.org/ns/shacl#"}, 

958 ) 

959 

960 additional_properties_results = shacl.query( 

961 additional_properties_query, 

962 initBindings={"shape": URIRef(shape_uri)}, 

963 ) 

964 

965 additional_properties = {} 

966 for row in select_results(additional_properties_results): 

967 predicate = str(row.predicate) 

968 has_value = str(row.has_value) 

969 additional_properties[predicate] = has_value 

970 

971 return additional_properties