Coverage for heritrace / routes / api.py: 99%

553 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import traceback 

6from dataclasses import dataclass 

7from typing import TypedDict, cast 

8 

9from flask import ( 

10 Blueprint, 

11 Response, 

12 current_app, 

13 g, 

14 jsonify, 

15 render_template_string, 

16 request, 

17) 

18from flask_babel import gettext 

19from flask_login import current_user, login_required 

20from rdflib import RDF, XSD, Graph, Literal, URIRef 

21 

22from heritrace.apis.orcid import get_responsible_agent_uri 

23from heritrace.editor import Editor, EndpointConfig 

24from heritrace.extensions import ( 

25 get_custom_filter, 

26 get_dataset_endpoint, 

27 get_form_fields, 

28 get_provenance_endpoint, 

29) 

30from heritrace.services.resource_lock_manager import LockStatus 

31from heritrace.utils.datatypes import DATATYPE_MAPPING 

32from heritrace.utils.primary_source_utils import save_user_default_primary_source 

33from heritrace.utils.shacl_utils import determine_shape_for_classes 

34from heritrace.utils.shacl_validation import validate_new_triple 

35from heritrace.utils.sparql_utils import ( 

36 CatalogQuery, 

37 DeletedEntitiesQuery, 

38 find_orphaned_entities, 

39 get_available_classes, 

40 get_catalog_data, 

41 get_deleted_entities_with_filtering, 

42 get_triples_from_graph, 

43 import_entity_graph, 

44 import_referenced_entities, 

45) 

46from heritrace.utils.strategies import OrphanHandlingStrategy, ProxyHandlingStrategy 

47from heritrace.utils.uri_utils import generate_unique_uri, is_valid_url 

48from heritrace.utils.virtual_properties import transform_changes_with_virtual_properties 

49 

50 

51@dataclass(frozen=True, slots=True) 

52class ChangeOperation: 

53 editor: Editor 

54 subject: URIRef 

55 graph_uri: URIRef | None = None 

56 entity_type: str | None = None 

57 entity_shape: str | None = None 

58 

59 

60api_bp = Blueprint("api", __name__) 

61 

62 

63@api_bp.route("/catalogue") 

64@login_required 

65def catalogue_api() -> Response: 

66 selected_class = request.args.get("class") 

67 selected_shape = request.args.get("shape") 

68 page = int(request.args.get("page", 1)) 

69 per_page = int( 

70 request.args.get("per_page", current_app.config["CATALOGUE_DEFAULT_PER_PAGE"]) 

71 ) 

72 sort_property = request.args.get("sort_property") 

73 sort_direction = request.args.get("sort_direction", "ASC") 

74 

75 allowed_per_page = current_app.config["CATALOGUE_ALLOWED_PER_PAGE"] 

76 if per_page not in allowed_per_page: 

77 per_page = current_app.config["CATALOGUE_DEFAULT_PER_PAGE"] 

78 

79 if not sort_property or sort_property.lower() == "null": 

80 sort_property = None 

81 

82 available_classes = get_available_classes() 

83 

84 catalog_data = get_catalog_data( 

85 CatalogQuery( 

86 selected_class=selected_class, 

87 page=page, 

88 per_page=per_page, 

89 sort_property=sort_property, 

90 sort_direction=sort_direction, 

91 selected_shape=selected_shape, 

92 ), 

93 available_classes, 

94 ) 

95 

96 catalog_data["available_classes"] = available_classes 

97 return jsonify(catalog_data) 

98 

99 

100@api_bp.route("/time-vault") 

101@login_required 

102def get_deleted_entities_api() -> Response: 

103 """ 

104 API endpoint to retrieve deleted entities with pagination and sorting. 

105 Only processes and returns entities whose classes are marked as visible. 

106 """ 

107 selected_class = request.args.get("class") 

108 selected_shape = request.args.get("shape") 

109 page = int(request.args.get("page", 1)) 

110 per_page = int( 

111 request.args.get("per_page", current_app.config["CATALOGUE_DEFAULT_PER_PAGE"]) 

112 ) 

113 sort_property = request.args.get("sort_property", "deletionTime") 

114 sort_direction = request.args.get("sort_direction", "DESC") 

115 

116 allowed_per_page = current_app.config["CATALOGUE_ALLOWED_PER_PAGE"] 

117 if per_page not in allowed_per_page: 

118 per_page = current_app.config["CATALOGUE_DEFAULT_PER_PAGE"] 

119 

120 ( 

121 deleted_entities, 

122 available_classes, 

123 selected_class, 

124 selected_shape, 

125 sortable_properties, 

126 total_count, 

127 ) = get_deleted_entities_with_filtering( 

128 DeletedEntitiesQuery( 

129 page, 

130 per_page, 

131 sort_property, 

132 sort_direction, 

133 selected_class, 

134 selected_shape, 

135 ) 

136 ) 

137 

138 return jsonify( 

139 { 

140 "entities": deleted_entities, 

141 "total_pages": (total_count + per_page - 1) // per_page 

142 if total_count > 0 

143 else 0, 

144 "current_page": page, 

145 "per_page": per_page, 

146 "total_count": total_count, 

147 "sort_property": sort_property, 

148 "sort_direction": sort_direction, 

149 "selected_class": selected_class, 

150 "selected_shape": selected_shape, 

151 "available_classes": available_classes, 

152 "sortable_properties": sortable_properties, 

153 } 

154 ) 

155 

156 

157@api_bp.route("/check-lock", methods=["POST"]) 

158@login_required 

159def check_lock() -> Response | tuple[Response, int]: 

160 """Check if a resource is locked.""" 

161 try: 

162 data = request.get_json() 

163 resource_uri = data.get("resource_uri") 

164 

165 if not resource_uri: 

166 return ( 

167 jsonify( 

168 {"status": "error", "message": gettext("No resource URI provided")} 

169 ), 

170 400, 

171 ) 

172 

173 status, lock_info = g.resource_lock_manager.check_lock_status(resource_uri) 

174 

175 if status == LockStatus.LOCKED: 

176 return jsonify( 

177 { 

178 "status": "locked", 

179 "title": gettext("Resource Locked"), 

180 "message": gettext( 

181 "This resource is currently being" 

182 " edited by %(user)s [%(orcid)s]", 

183 user=lock_info.user_name, 

184 orcid=lock_info.user_id, 

185 ), 

186 } 

187 ) 

188 if status == LockStatus.ERROR: 

189 return ( 

190 jsonify( 

191 { 

192 "status": "error", 

193 "title": gettext("Error"), 

194 "message": gettext("An error occurred while checking the lock"), 

195 } 

196 ), 

197 500, 

198 ) 

199 return jsonify({"status": "available"}) 

200 

201 except Exception: 

202 current_app.logger.exception("Error in check_lock") 

203 return ( 

204 jsonify( 

205 { 

206 "status": "error", 

207 "title": gettext("Error"), 

208 "message": gettext("An unexpected error occurred"), 

209 } 

210 ), 

211 500, 

212 ) 

213 

214 

215@api_bp.route("/acquire-lock", methods=["POST"]) 

216@login_required 

217def acquire_lock() -> Response | tuple[Response, int]: 

218 """Try to acquire a lock on a resource.""" 

219 try: 

220 data = request.get_json() 

221 resource_uri = data.get("resource_uri") 

222 linked_resources = data.get("linked_resources", []) 

223 

224 if not resource_uri: 

225 return ( 

226 jsonify( 

227 {"status": "error", "message": gettext("No resource URI provided")} 

228 ), 

229 400, 

230 ) 

231 

232 # First check if the resource or any related resource is locked by another user 

233 status, lock_info = g.resource_lock_manager.check_lock_status(resource_uri) 

234 if status == LockStatus.LOCKED: 

235 return ( 

236 jsonify( 

237 { 

238 "status": "locked", 

239 "title": gettext("Resource Locked"), 

240 "message": gettext( 

241 "This resource is currently" 

242 " being edited by" 

243 " %(user)s [%(orcid)s]", 

244 user=lock_info.user_name, 

245 orcid=lock_info.user_id, 

246 ), 

247 } 

248 ), 

249 200, 

250 ) 

251 

252 # Use the provided linked_resources 

253 success = g.resource_lock_manager.acquire_lock(resource_uri, linked_resources) 

254 

255 if success: 

256 return jsonify({"status": "success"}) 

257 

258 return ( 

259 jsonify( 

260 { 

261 "status": "error", 

262 "message": gettext("Resource is locked by another user"), 

263 } 

264 ), 

265 423, 

266 ) 

267 

268 except Exception: 

269 current_app.logger.exception("Error in acquire_lock") 

270 return ( 

271 jsonify( 

272 {"status": "error", "message": gettext("An unexpected error occurred")} 

273 ), 

274 500, 

275 ) 

276 

277 

278@api_bp.route("/release-lock", methods=["POST"]) 

279@login_required 

280def release_lock() -> Response | tuple[Response, int]: 

281 """Release a lock on a resource.""" 

282 try: 

283 data = request.get_json() 

284 resource_uri = data.get("resource_uri") 

285 

286 if not resource_uri: 

287 return ( 

288 jsonify( 

289 {"status": "error", "message": gettext("No resource URI provided")} 

290 ), 

291 400, 

292 ) 

293 

294 success = g.resource_lock_manager.release_lock(resource_uri) 

295 

296 if success: 

297 return jsonify({"status": "success"}) 

298 

299 return ( 

300 jsonify({"status": "error", "message": gettext("Unable to release lock")}), 

301 400, 

302 ) 

303 

304 except Exception: 

305 current_app.logger.exception("Error in release_lock") 

306 return ( 

307 jsonify( 

308 {"status": "error", "message": gettext("An unexpected error occurred")} 

309 ), 

310 500, 

311 ) 

312 

313 

314@api_bp.route("/renew-lock", methods=["POST"]) 

315@login_required 

316def renew_lock() -> Response | tuple[Response, int]: 

317 """Renew an existing lock on a resource.""" 

318 try: 

319 data = request.get_json() 

320 resource_uri = data.get("resource_uri") 

321 

322 if not resource_uri: 

323 return ( 

324 jsonify( 

325 {"status": "error", "message": gettext("No resource URI provided")} 

326 ), 

327 400, 

328 ) 

329 

330 # When renewing a lock, we don't need to check for linked resources again 

331 # Just pass an empty list as we're only refreshing the existing lock 

332 success = g.resource_lock_manager.acquire_lock(resource_uri, []) 

333 

334 if success: 

335 return jsonify({"status": "success"}) 

336 

337 return ( 

338 jsonify({"status": "error", "message": gettext("Unable to renew lock")}), 

339 423, 

340 ) 

341 

342 except Exception: 

343 current_app.logger.exception("Error in renew_lock") 

344 return ( 

345 jsonify( 

346 {"status": "error", "message": gettext("An unexpected error occurred")} 

347 ), 

348 500, 

349 ) 

350 

351 

352@api_bp.route("/validate-literal", methods=["POST"]) 

353@login_required 

354def validate_literal() -> tuple[Response, int]: 

355 """Validate a literal value and suggest appropriate datatypes.""" 

356 value = request.json.get("value") 

357 if not value: 

358 return jsonify({"error": gettext("Value is required.")}), 400 

359 

360 matching_datatypes = [] 

361 for datatype, validation_func, _ in DATATYPE_MAPPING: 

362 if validation_func(value): 

363 matching_datatypes.append(str(datatype)) 

364 

365 if not matching_datatypes: 

366 return jsonify({"error": gettext("No matching datatypes found.")}), 400 

367 

368 return jsonify({"valid_datatypes": matching_datatypes}), 200 

369 

370 

371def _collect_affected_entities( 

372 changes: list[dict], 

373 entity_type: str, 

374 *, 

375 check_for_orphans: bool, 

376 check_for_proxies: bool, 

377) -> tuple[list[dict[str, str]], list[dict[str, str]]]: 

378 orphans: list[dict[str, str]] = [] 

379 intermediate_orphans: list[dict[str, str]] = [] 

380 for change in changes: 

381 if change["action"] == "delete": 

382 found_orphans, found_intermediates = find_orphaned_entities( 

383 URIRef(change["subject"]), 

384 entity_type, 

385 URIRef(change["predicate"]) if change.get("predicate") else None, 

386 change.get("object"), 

387 ) 

388 if check_for_orphans: 

389 orphans.extend(found_orphans) 

390 if check_for_proxies: 

391 intermediate_orphans.extend(found_intermediates) 

392 return orphans, intermediate_orphans 

393 

394 

395def _format_orphan_response( 

396 orphans: list[dict[str, str]], 

397 intermediate_orphans: list[dict[str, str]], 

398 entity_shape: str | None, 

399 orphan_strategy: OrphanHandlingStrategy, 

400 proxy_strategy: ProxyHandlingStrategy, 

401) -> Response: 

402 custom_filter = get_custom_filter() 

403 

404 def format_entities( 

405 entities: list[dict[str, str]], 

406 *, 

407 is_intermediate: bool = False, 

408 ) -> list[dict[str, str | bool]]: 

409 return [ 

410 { 

411 "uri": entity["uri"], 

412 "label": custom_filter.human_readable_entity( 

413 entity["uri"], (entity["type"], entity_shape) 

414 ), 

415 "type": custom_filter.human_readable_class( 

416 (entity["type"], entity_shape) 

417 ), 

418 "is_intermediate": is_intermediate, 

419 } 

420 for entity in entities 

421 ] 

422 

423 affected_entities = format_entities(orphans) + format_entities( 

424 intermediate_orphans, is_intermediate=True 

425 ) 

426 

427 should_delete = ( 

428 orphan_strategy == OrphanHandlingStrategy.DELETE 

429 and proxy_strategy == ProxyHandlingStrategy.DELETE 

430 ) 

431 

432 return jsonify( 

433 { 

434 "status": "success", 

435 "affected_entities": affected_entities, 

436 "should_delete": should_delete, 

437 "orphan_strategy": orphan_strategy.value, 

438 "proxy_strategy": proxy_strategy.value, 

439 } 

440 ) 

441 

442 

443@api_bp.route("/check_orphans", methods=["POST"]) 

444@login_required 

445def check_orphans() -> Response | tuple[Response, int]: 

446 try: 

447 orphan_strategy = current_app.config.get( 

448 "ORPHAN_HANDLING_STRATEGY", OrphanHandlingStrategy.KEEP 

449 ) 

450 proxy_strategy = current_app.config.get( 

451 "PROXY_HANDLING_STRATEGY", ProxyHandlingStrategy.KEEP 

452 ) 

453 

454 data = request.json 

455 if not data or "changes" not in data or "entity_type" not in data: 

456 return ( 

457 jsonify( 

458 { 

459 "status": "error", 

460 "error_type": "validation", 

461 "message": gettext( 

462 "Invalid request: 'changes' and" 

463 " 'entity_type' are required fields" 

464 ), 

465 } 

466 ), 

467 400, 

468 ) 

469 

470 changes = data.get("changes", []) 

471 entity_type = data.get("entity_type") 

472 entity_shape = data.get("entity_shape") 

473 

474 check_for_orphans = orphan_strategy in ( 

475 OrphanHandlingStrategy.DELETE, 

476 OrphanHandlingStrategy.ASK, 

477 ) 

478 check_for_proxies = proxy_strategy in ( 

479 ProxyHandlingStrategy.DELETE, 

480 ProxyHandlingStrategy.ASK, 

481 ) 

482 

483 orphans: list[dict[str, str]] = [] 

484 intermediate_orphans: list[dict[str, str]] = [] 

485 if check_for_orphans or check_for_proxies: 

486 orphans, intermediate_orphans = _collect_affected_entities( 

487 changes, 

488 entity_type, 

489 check_for_orphans=check_for_orphans, 

490 check_for_proxies=check_for_proxies, 

491 ) 

492 

493 if (orphan_strategy == OrphanHandlingStrategy.KEEP or not orphans) and ( 

494 proxy_strategy == ProxyHandlingStrategy.KEEP or not intermediate_orphans 

495 ): 

496 return jsonify({"status": "success", "affected_entities": []}) 

497 

498 return _format_orphan_response( 

499 orphans, 

500 intermediate_orphans, 

501 entity_shape, 

502 orphan_strategy, 

503 proxy_strategy, 

504 ) 

505 except ValueError as e: 

506 error_message = str(e) 

507 current_app.logger.warning( 

508 "Validation error in check_orphans: %s", error_message 

509 ) 

510 return ( 

511 jsonify( 

512 { 

513 "status": "error", 

514 "error_type": "validation", 

515 "message": gettext( 

516 "An error occurred while checking for orphaned entities" 

517 ), 

518 } 

519 ), 

520 400, 

521 ) 

522 except Exception as e: 

523 error_message = f"Error checking orphans: {e!s}" 

524 current_app.logger.exception("%s\n%s", error_message, traceback.format_exc()) 

525 return ( 

526 jsonify( 

527 { 

528 "status": "error", 

529 "error_type": "system", 

530 "message": gettext( 

531 "An error occurred while checking for orphaned entities" 

532 ), 

533 } 

534 ), 

535 500, 

536 ) 

537 

538 

539def _parse_change_request( 

540 changes: list[dict], 

541) -> tuple[URIRef, list[dict], bool, str | None, bool]: 

542 first_change = changes[0] if changes else {} 

543 subject = URIRef(first_change.get("subject", "")) 

544 affected_entities = first_change.get("affected_entities", []) 

545 delete_affected = first_change.get("delete_affected", False) 

546 primary_source = first_change.get("primary_source") 

547 save_default_source = first_change.get("save_default_source", False) 

548 return ( 

549 subject, 

550 affected_entities, 

551 delete_affected, 

552 primary_source, 

553 save_default_source, 

554 ) 

555 

556 

557def _setup_editor( 

558 primary_source: str | None, 

559 changes: list[dict], 

560 subject: URIRef, 

561) -> tuple[Editor, URIRef | None]: 

562 resp_agent = get_responsible_agent_uri(current_user.orcid) 

563 editor = Editor( 

564 EndpointConfig( 

565 dataset=get_dataset_endpoint(), 

566 provenance=get_provenance_endpoint(), 

567 is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

568 ), 

569 current_app.config["COUNTER_HANDLER"], 

570 resp_agent, 

571 current_app.config["PRIMARY_SOURCE"], 

572 current_app.config["DATASET_GENERATION_TIME"], 

573 ) 

574 

575 if primary_source and is_valid_url(primary_source): 

576 editor.set_primary_source(URIRef(primary_source)) 

577 

578 has_entity_deletion = any( 

579 change["action"] == "delete" and not change.get("predicate") 

580 for change in changes 

581 ) 

582 

583 editor = import_entity_graph( 

584 editor, subject, include_referencing_entities=has_entity_deletion 

585 ) 

586 

587 for change in changes: 

588 if change["action"] == "create": 

589 data = change.get("data") 

590 if data: 

591 import_referenced_entities(editor, data) 

592 

593 editor.preexisting_finished() 

594 

595 graph_uri: URIRef | None = None 

596 if editor.dataset_is_quadstore: 

597 for quad in editor.g_set.quads((subject, None, None, None)): # type: ignore[union-attr] 

598 graph_uri = get_graph_uri_from_context(cast("Graph | URIRef", quad[3])) 

599 break 

600 

601 return editor, graph_uri 

602 

603 

604def _process_creates( 

605 editor: Editor, 

606 changes: list[dict], 

607 graph_uri: URIRef | None, 

608 subject: URIRef, 

609) -> tuple[dict[str, str], URIRef]: 

610 temp_id_to_uri: dict[str, str] = {} 

611 for change in changes: 

612 if change["action"] == "create": 

613 data = change.get("data") 

614 if data: 

615 change_subject_str = change.get("subject") 

616 change_subject = ( 

617 URIRef(change_subject_str) if change_subject_str else None 

618 ) 

619 created_subject = create_logic( 

620 editor, 

621 data, 

622 change_subject, 

623 graph_uri, 

624 temp_id_to_uri=temp_id_to_uri, 

625 parent_entity_type=None, 

626 ) 

627 if change_subject is not None: 

628 subject = created_subject 

629 return temp_id_to_uri, subject 

630 

631 

632def _handle_affected_entities( 

633 editor: Editor, 

634 affected_entities: list[dict], 

635 *, 

636 delete_affected: bool, 

637 graph_uri: URIRef | None, 

638 deleted_entities: set[URIRef], 

639) -> None: 

640 orphan_strategy = current_app.config.get( 

641 "ORPHAN_HANDLING_STRATEGY", OrphanHandlingStrategy.KEEP 

642 ) 

643 proxy_strategy = current_app.config.get( 

644 "PROXY_HANDLING_STRATEGY", ProxyHandlingStrategy.KEEP 

645 ) 

646 # Separiamo le operazioni di delete in due fasi: 

647 # 1. Prima eliminiamo tutte le entità orfane/intermedie 

648 # 2. Poi eliminiamo le triple specifiche 

649 

650 # Fase 1: Elimina le entità orfane/intermedie 

651 if not (affected_entities and delete_affected): 

652 return 

653 

654 # Separa gli orfani dalle entità proxy 

655 orphans = [ 

656 entity for entity in affected_entities if not entity.get("is_intermediate") 

657 ] 

658 proxies = [entity for entity in affected_entities if entity.get("is_intermediate")] 

659 

660 # Gestione degli orfani secondo la strategia per gli orfani 

661 should_delete_orphans = orphan_strategy == OrphanHandlingStrategy.DELETE or ( 

662 orphan_strategy == OrphanHandlingStrategy.ASK and delete_affected 

663 ) 

664 

665 if should_delete_orphans and orphans: 

666 for orphan in orphans: 

667 orphan_uri = URIRef(orphan["uri"]) 

668 if orphan_uri in deleted_entities: 

669 continue 

670 

671 delete_logic( 

672 ChangeOperation(editor=editor, subject=orphan_uri, graph_uri=graph_uri) 

673 ) 

674 deleted_entities.add(orphan_uri) 

675 

676 # Gestione delle entità proxy secondo la strategia per i proxy 

677 should_delete_proxies = proxy_strategy == ProxyHandlingStrategy.DELETE or ( 

678 proxy_strategy == ProxyHandlingStrategy.ASK and delete_affected 

679 ) 

680 

681 if should_delete_proxies and proxies: 

682 for proxy in proxies: 

683 proxy_uri = URIRef(proxy["uri"]) 

684 if proxy_uri in deleted_entities: 

685 continue 

686 

687 delete_logic( 

688 ChangeOperation(editor=editor, subject=proxy_uri, graph_uri=graph_uri) 

689 ) 

690 deleted_entities.add(proxy_uri) 

691 

692 

693def _process_remaining_changes( 

694 editor: Editor, 

695 changes: list[dict], 

696 graph_uri: URIRef | None, 

697 deleted_entities: set[URIRef], 

698 temp_id_to_uri: dict[str, str], 

699) -> None: 

700 for change in changes: 

701 if change["action"] == "delete": 

702 _process_delete_change(editor, change, graph_uri, deleted_entities) 

703 elif change["action"] == "update": 

704 op = ChangeOperation( 

705 editor=editor, 

706 subject=URIRef(change["subject"]), 

707 graph_uri=graph_uri, 

708 entity_type=change.get("entity_type"), 

709 entity_shape=change.get("entity_shape"), 

710 ) 

711 update_logic( 

712 op, 

713 URIRef(change["predicate"]), 

714 change["object"], 

715 change["newObject"], 

716 ) 

717 elif change["action"] == "order": 

718 op = ChangeOperation( 

719 editor=editor, 

720 subject=URIRef(change["subject"]), 

721 graph_uri=graph_uri, 

722 ) 

723 order_logic( 

724 op, 

725 URIRef(change["predicate"]), 

726 change["object"], 

727 URIRef(change["newObject"]), 

728 temp_id_to_uri, 

729 ) 

730 

731 

732def _process_delete_change( 

733 editor: Editor, 

734 change: dict, 

735 graph_uri: URIRef | None, 

736 deleted_entities: set[URIRef], 

737) -> None: 

738 change_subject = URIRef(change["subject"]) 

739 change_predicate = URIRef(change["predicate"]) if change.get("predicate") else None 

740 object_value = change.get("object") 

741 

742 op = ChangeOperation( 

743 editor=editor, 

744 subject=change_subject, 

745 graph_uri=graph_uri, 

746 entity_type=change.get("entity_type"), 

747 entity_shape=change.get("entity_shape"), 

748 ) 

749 

750 if not change_predicate: 

751 if change_subject in deleted_entities: 

752 return 

753 

754 delete_logic(op) 

755 deleted_entities.add(change_subject) 

756 elif object_value: 

757 if URIRef(object_value) in deleted_entities: 

758 return 

759 

760 delete_logic(op, change_predicate, object_value) 

761 

762 

763def _save_and_respond(editor: Editor) -> tuple[Response, int]: 

764 try: 

765 editor.save() 

766 except ValueError: 

767 current_app.logger.exception("Error during save operation") 

768 raise 

769 except Exception as save_error: 

770 current_app.logger.exception("Error during save operation") 

771 return jsonify( 

772 { 

773 "status": "error", 

774 "error_type": "database", 

775 "message": gettext("Failed to save changes to the database: {}").format( 

776 str(save_error) 

777 ), 

778 } 

779 ), 500 

780 

781 return ( 

782 jsonify( 

783 { 

784 "status": "success", 

785 "message": gettext("Changes applied successfully"), 

786 } 

787 ), 

788 200, 

789 ) 

790 

791 

792@api_bp.route("/apply_changes", methods=["POST"]) 

793@login_required 

794def apply_changes() -> tuple[Response, int]: 

795 """Apply changes to entities. 

796 

797 Request body: 

798 { 

799 "subject": (str) Main entity URI being modified, 

800 "changes": (list) List of changes to apply, 

801 "primary_source": (str) Primary source to use for provenance, 

802 "save_default_source": (bool) Whether to save primary_source as default for 

803 current user, 

804 "affected_entities": (list) Entities potentially affected by delete operations, 

805 "delete_affected": (bool) Whether to delete affected entities 

806 } 

807 

808 Responses: 

809 200 OK: Changes applied successfully 

810 400 Bad Request: Invalid request or validation error 

811 500 Internal Server Error: Server error while applying changes 

812 """ 

813 try: 

814 changes = request.get_json() 

815 if not changes: 

816 return jsonify({"error": "No request data provided"}), 400 

817 

818 ( 

819 subject, 

820 affected_entities, 

821 delete_affected, 

822 primary_source, 

823 save_default_source, 

824 ) = _parse_change_request(changes) 

825 

826 if primary_source and not is_valid_url(primary_source): 

827 return jsonify({"error": "Invalid primary source URL"}), 400 

828 

829 if save_default_source and primary_source and is_valid_url(primary_source): 

830 save_user_default_primary_source(current_user.orcid, primary_source) 

831 

832 changes = transform_changes_with_virtual_properties(changes) 

833 

834 editor, graph_uri = _setup_editor(primary_source, changes, subject) 

835 

836 temp_id_to_uri, subject = _process_creates(editor, changes, graph_uri, subject) 

837 

838 deleted_entities: set[URIRef] = set() 

839 _handle_affected_entities( 

840 editor, 

841 affected_entities, 

842 delete_affected=delete_affected, 

843 graph_uri=graph_uri, 

844 deleted_entities=deleted_entities, 

845 ) 

846 _process_remaining_changes( 

847 editor, changes, graph_uri, deleted_entities, temp_id_to_uri 

848 ) 

849 

850 return _save_and_respond(editor) 

851 

852 except ValueError as e: 

853 error_message = str(e) 

854 current_app.logger.warning("Validation error: %s", error_message) 

855 return ( 

856 jsonify( 

857 { 

858 "status": "error", 

859 "error_type": "validation", 

860 "message": error_message, 

861 } 

862 ), 

863 400, 

864 ) 

865 except Exception as e: 

866 error_message = f"Error while applying changes: {e!s}\n{traceback.format_exc()}" 

867 current_app.logger.exception(error_message) 

868 return ( 

869 jsonify( 

870 { 

871 "status": "error", 

872 "error_type": "system", 

873 "message": gettext("An error occurred while applying changes"), 

874 } 

875 ), 

876 500, 

877 ) 

878 

879 

880def get_graph_uri_from_context(graph_context: Graph | URIRef) -> URIRef: 

881 if isinstance(graph_context, Graph): 

882 return cast("URIRef", graph_context.identifier) 

883 return cast("URIRef", graph_context) 

884 

885 

886def determine_datatype(value: str, datatype_uris: list[str]) -> URIRef: 

887 for datatype_uri in datatype_uris: 

888 validation_func = next( 

889 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None 

890 ) 

891 if validation_func and validation_func(value): 

892 return URIRef(datatype_uri) 

893 # If none match, default to XSD.string 

894 return XSD.string 

895 

896 

897class CreateEntityData(TypedDict, total=False): 

898 entity_type: str 

899 # TODO(arcangelo): tighten this type after normalizing 

900 # the frontend payload to a consistent shape 

901 properties: dict[str, list | dict | str] 

902 tempId: str 

903 

904 

905@dataclass 

906class _CreateContext: 

907 editor: Editor 

908 graph_uri: URIRef | None 

909 entity_type: str | None 

910 temp_id_to_uri: dict[str, str] | None 

911 

912 

913def _handle_property_value( 

914 ctx: _CreateContext, 

915 value: dict | str, 

916 subject: URIRef, 

917 predicate: URIRef, 

918) -> None: 

919 if isinstance(value, dict) and "entity_type" in value: 

920 nested_subject = generate_unique_uri(value["entity_type"]) 

921 create_logic( 

922 ctx.editor, 

923 cast("CreateEntityData", value), 

924 nested_subject, 

925 ctx.graph_uri, 

926 subject, 

927 predicate, 

928 ctx.temp_id_to_uri, 

929 parent_entity_type=ctx.entity_type, 

930 ) 

931 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

932 entity_uri = value.get("entity_uri") 

933 if entity_uri: 

934 ctx.editor.create(subject, predicate, URIRef(entity_uri), ctx.graph_uri) 

935 else: 

936 msg = "Missing entity_uri in existing entity reference" 

937 raise ValueError(msg) 

938 elif isinstance(value, dict) and value.get("is_custom_property", False): 

939 if value["type"] == "uri": 

940 object_value = URIRef(value["value"]) 

941 elif value["type"] == "literal": 

942 datatype = URIRef(value["datatype"]) if "datatype" in value else XSD.string 

943 object_value = Literal(value["value"], datatype=datatype) 

944 else: 

945 msg = f"Unknown custom property type: {value['type']}" 

946 raise ValueError(msg) 

947 

948 ctx.editor.create(subject, predicate, object_value, ctx.graph_uri) 

949 else: 

950 object_value, _, error_message = validate_new_triple( 

951 subject, 

952 predicate, 

953 str(value), 

954 "create", 

955 entity_types=ctx.entity_type, 

956 ) 

957 if error_message: 

958 raise ValueError(error_message) 

959 

960 if object_value is not None: 

961 ctx.editor.create(subject, predicate, object_value, ctx.graph_uri) 

962 

963 

964def _setup_parent_relations( 

965 ctx: _CreateContext, 

966 subject: URIRef, 

967 parent_subject: URIRef, 

968 parent_predicate: URIRef | None, 

969 parent_entity_type: str | None, 

970) -> None: 

971 type_value, _, error_message = validate_new_triple( 

972 subject, RDF.type, ctx.entity_type, "create", entity_types=ctx.entity_type 

973 ) 

974 if error_message: 

975 raise ValueError(error_message) 

976 

977 if type_value is not None: 

978 ctx.editor.create(subject, RDF.type, type_value, ctx.graph_uri) 

979 

980 if parent_predicate: 

981 parent_value, _, error_message = validate_new_triple( 

982 parent_subject, 

983 parent_predicate, 

984 subject, 

985 "create", 

986 entity_types=parent_entity_type, 

987 ) 

988 if error_message: 

989 raise ValueError(error_message) 

990 

991 if parent_value is not None: 

992 ctx.editor.create( 

993 parent_subject, parent_predicate, parent_value, ctx.graph_uri 

994 ) 

995 

996 

997def create_logic( # noqa: PLR0913 

998 editor: Editor, 

999 data: CreateEntityData, 

1000 subject: URIRef | None = None, 

1001 graph_uri: URIRef | None = None, 

1002 parent_subject: URIRef | None = None, 

1003 parent_predicate: URIRef | None = None, 

1004 temp_id_to_uri: dict[str, str] | None = None, 

1005 parent_entity_type: str | None = None, 

1006) -> URIRef: 

1007 entity_type: str | None = data.get("entity_type") 

1008 properties: dict = data.get("properties", {}) 

1009 temp_id: str | None = data.get("tempId") 

1010 

1011 if subject is None: 

1012 subject = generate_unique_uri(entity_type, cast("dict", data)) 

1013 

1014 if temp_id and temp_id_to_uri is not None: 

1015 temp_id_to_uri[temp_id] = str(subject) 

1016 

1017 ctx = _CreateContext(editor, graph_uri, entity_type, temp_id_to_uri) 

1018 

1019 if parent_subject is not None: 

1020 _setup_parent_relations( 

1021 ctx, subject, parent_subject, parent_predicate, parent_entity_type 

1022 ) 

1023 

1024 for predicate_str, values in properties.items(): 

1025 predicate = URIRef(predicate_str) 

1026 values_list = values if isinstance(values, list) else [values] 

1027 for value in values_list: 

1028 _handle_property_value(ctx, value, subject, predicate) 

1029 

1030 return subject 

1031 

1032 

1033def update_logic( 

1034 op: ChangeOperation, 

1035 predicate: URIRef, 

1036 old_value: str, 

1037 new_value: str, 

1038) -> None: 

1039 old_value_rdf: URIRef | Literal = ( 

1040 URIRef(old_value) if is_valid_url(old_value) else Literal(old_value) 

1041 ) 

1042 validated_new, validated_old, error_message = validate_new_triple( 

1043 op.subject, 

1044 predicate, 

1045 new_value, 

1046 "update", 

1047 old_value_rdf, 

1048 entity_types=op.entity_type, 

1049 ) 

1050 if error_message: 

1051 raise ValueError(error_message) 

1052 

1053 op.editor.update( 

1054 op.subject, 

1055 predicate, 

1056 cast("Literal | URIRef", validated_old), 

1057 cast("Literal | URIRef", validated_new), 

1058 op.graph_uri, 

1059 ) 

1060 

1061 

1062def rebuild_entity_order( 

1063 editor: Editor, 

1064 ordered_by_uri: URIRef, 

1065 entities: list[URIRef], 

1066 graph_uri: URIRef | None = None, 

1067) -> Editor: 

1068 for entity in entities: 

1069 for _s, _p, o in list( 

1070 get_triples_from_graph(editor.g_set, (entity, ordered_by_uri, None)) 

1071 ): 

1072 editor.delete( 

1073 entity, ordered_by_uri, cast("Literal | URIRef", o), graph_uri 

1074 ) 

1075 

1076 # Then rebuild the chain with the entities 

1077 for i in range(len(entities) - 1): 

1078 current_entity = entities[i] 

1079 next_entity = entities[i + 1] 

1080 editor.create(current_entity, ordered_by_uri, next_entity, graph_uri) 

1081 

1082 return editor 

1083 

1084 

1085def delete_logic( 

1086 op: ChangeOperation, 

1087 predicate: URIRef | None = None, 

1088 object_value: str | None = None, 

1089) -> None: 

1090 resolved_value: URIRef | Literal | None = None 

1091 if predicate and object_value: 

1092 old_val_rdf: URIRef | Literal = ( 

1093 URIRef(object_value) 

1094 if is_valid_url(object_value) 

1095 else Literal(object_value) 

1096 ) 

1097 _, resolved_value, error_message = validate_new_triple( 

1098 op.subject, 

1099 predicate, 

1100 None, 

1101 "delete", 

1102 old_val_rdf, 

1103 entity_types=op.entity_type, 

1104 ) 

1105 if error_message: 

1106 raise ValueError(error_message) 

1107 

1108 op.editor.delete( 

1109 op.subject, 

1110 predicate, 

1111 cast("Literal | URIRef | None", resolved_value), 

1112 op.graph_uri, 

1113 ) 

1114 

1115 

1116def order_logic( 

1117 op: ChangeOperation, 

1118 predicate: URIRef, 

1119 new_order: list[str], 

1120 ordered_by: URIRef, 

1121 temp_id_to_uri: dict[str, str] | None = None, 

1122) -> Editor: 

1123 current_entities = [ 

1124 o 

1125 for _, _, o in get_triples_from_graph( 

1126 op.editor.g_set, (op.subject, predicate, None) 

1127 ) 

1128 ] 

1129 

1130 old_to_new_mapping = {} 

1131 

1132 for old_entity in current_entities: 

1133 if str(old_entity) in new_order: 

1134 entity_properties = list( 

1135 get_triples_from_graph( 

1136 op.editor.g_set, 

1137 (cast("URIRef", old_entity), None, None), 

1138 ) 

1139 ) 

1140 

1141 entity_type = next( 

1142 (o for _, p, o in entity_properties if p == RDF.type), None 

1143 ) 

1144 

1145 if entity_type is None: 

1146 msg = f"Impossibile determinare il tipo dell'entità per {old_entity}" 

1147 raise ValueError(msg) 

1148 

1149 new_entity_uri = generate_unique_uri(str(entity_type)) 

1150 old_to_new_mapping[old_entity] = new_entity_uri 

1151 

1152 op.editor.delete( 

1153 op.subject, 

1154 predicate, 

1155 cast("Literal | URIRef", old_entity), 

1156 op.graph_uri, 

1157 ) 

1158 op.editor.delete(cast("URIRef", old_entity), graph=op.graph_uri) 

1159 

1160 op.editor.create(op.subject, predicate, new_entity_uri, op.graph_uri) 

1161 

1162 for _, p, o in entity_properties: 

1163 if p not in (predicate, ordered_by): 

1164 op.editor.create( 

1165 new_entity_uri, 

1166 cast("URIRef", p), 

1167 cast("Literal | URIRef", o), 

1168 op.graph_uri, 

1169 ) 

1170 

1171 ordered_entities = [] 

1172 for entity in new_order: 

1173 new_entity_uri = old_to_new_mapping.get(URIRef(entity)) 

1174 if not new_entity_uri: 

1175 new_entity_uri = URIRef( 

1176 temp_id_to_uri.get(entity, entity) if temp_id_to_uri else entity 

1177 ) 

1178 ordered_entities.append(new_entity_uri) 

1179 

1180 if ordered_entities: 

1181 rebuild_entity_order(op.editor, ordered_by, ordered_entities, op.graph_uri) 

1182 

1183 return op.editor 

1184 

1185 

1186@api_bp.route("/human-readable-entity", methods=["POST"]) 

1187@login_required 

1188def get_human_readable_entity() -> str | tuple[Response, int]: 

1189 custom_filter = get_custom_filter() 

1190 

1191 # Check if required parameters are present 

1192 if "uri" not in request.form or "entity_class" not in request.form: 

1193 return jsonify( 

1194 {"status": "error", "message": "Missing required parameters"} 

1195 ), 400 

1196 

1197 uri = request.form["uri"] 

1198 entity_class = request.form["entity_class"] 

1199 shape = determine_shape_for_classes([entity_class]) 

1200 filter_instance = custom_filter 

1201 return filter_instance.human_readable_entity(uri, (entity_class, shape)) 

1202 

1203 

1204@api_bp.route("/format-source", methods=["POST"]) 

1205@login_required 

1206def format_source_api() -> Response | tuple[Response, int]: 

1207 """ 

1208 API endpoint to format a source URL using the application's filters. 

1209 Accepts POST request with JSON body: {"url": "source_url"} 

1210 Returns JSON: {"formatted_html": "html_string"} 

1211 """ 

1212 data = request.get_json() 

1213 source_url = data.get("url") 

1214 

1215 if not source_url or not is_valid_url(source_url): 

1216 return jsonify({"error": gettext("Invalid or missing URL")}), 400 

1217 

1218 try: 

1219 custom_filter = get_custom_filter() 

1220 formatted_html = custom_filter.format_source_reference(source_url) 

1221 return jsonify({"formatted_html": formatted_html}) 

1222 except Exception: 

1223 current_app.logger.exception( 

1224 "Error formatting source URL '%s'", 

1225 source_url, 

1226 ) 

1227 fallback_html = f'<a href="{source_url}" target="_blank">{source_url}</a>' 

1228 return jsonify({"formatted_html": fallback_html}) 

1229 

1230 

1231@api_bp.route("/form-fields", methods=["GET"]) 

1232@login_required 

1233def get_form_fields_for_entity() -> Response | tuple[Response, int]: 

1234 """ 

1235 Get form_fields for a specific entity class and shape combination. 

1236 Returns only the requested entity + immediate sub-entities (depth=2) to improve 

1237 performance. 

1238 

1239 Query parameters: 

1240 entity_class: URI of the entity class 

1241 entity_shape: URI of the entity shape 

1242 

1243 Returns: 

1244 JSON response with form_fields for the specified entity 

1245 """ 

1246 

1247 try: 

1248 entity_class_decoded = request.args.get("entity_class") 

1249 entity_shape_decoded = request.args.get("entity_shape") 

1250 

1251 if not entity_class_decoded or not entity_shape_decoded: 

1252 return jsonify( 

1253 { 

1254 "status": "error", 

1255 "message": ( 

1256 "Missing required parameters: entity_class and entity_shape" 

1257 ), 

1258 } 

1259 ), 400 

1260 

1261 all_form_fields = get_form_fields() 

1262 

1263 if not all_form_fields: 

1264 return jsonify( 

1265 {"status": "error", "message": "Form fields not initialized"} 

1266 ), 500 

1267 

1268 entity_key = (entity_class_decoded, entity_shape_decoded) 

1269 

1270 if entity_key not in all_form_fields: 

1271 return jsonify( 

1272 { 

1273 "status": "error", 

1274 "message": ( 

1275 f"No form fields found for entity class" 

1276 f" {entity_class_decoded} with shape" 

1277 f" {entity_shape_decoded}" 

1278 ), 

1279 } 

1280 ), 404 

1281 

1282 entity_form_fields = all_form_fields[entity_key] 

1283 

1284 # Convert OrderedDict to list of [property, details] pairs to preserve order 

1285 ordered_properties = [] 

1286 for prop, details_list in entity_form_fields.items(): 

1287 ordered_properties.append([prop, details_list]) 

1288 

1289 return jsonify( 

1290 { 

1291 "status": "success", 

1292 "form_fields": ordered_properties, 

1293 "entity_key": [entity_class_decoded, entity_shape_decoded], 

1294 } 

1295 ) 

1296 

1297 except Exception as e: 

1298 current_app.logger.exception( 

1299 "Error loading form fields for %s/%s", 

1300 entity_class_decoded, 

1301 entity_shape_decoded, 

1302 ) 

1303 

1304 return jsonify( 

1305 {"status": "error", "message": f"Failed to load form fields: {e!s}"} 

1306 ), 500 

1307 

1308 

1309@api_bp.route("/render-form-fields", methods=["POST"]) 

1310@login_required 

1311def render_form_fields_html() -> str | tuple[Response, int]: 

1312 """ 

1313 Render form fields as HTML for dynamic loading. 

1314 

1315 Expects JSON payload with: 

1316 - entity_key: [entity_class, entity_shape] array 

1317 

1318 Returns: 

1319 HTML string of the rendered form fields 

1320 """ 

1321 try: 

1322 data = request.get_json() 

1323 

1324 if not data or "entity_key" not in data: 

1325 return jsonify( 

1326 {"status": "error", "message": "Missing required field: entity_key"} 

1327 ), 400 

1328 

1329 entity_key = data["entity_key"] # This is [entity_class, entity_shape] array 

1330 entity_class, entity_shape = entity_key 

1331 

1332 all_form_fields = get_form_fields() 

1333 

1334 if not all_form_fields: 

1335 return jsonify( 

1336 {"status": "error", "message": "Form fields not initialized"} 

1337 ), 500 

1338 

1339 tuple_key = (entity_class, entity_shape) 

1340 if tuple_key not in all_form_fields: 

1341 return jsonify( 

1342 { 

1343 "status": "error", 

1344 "message": ( 

1345 f"No form fields found for entity" 

1346 f" {entity_class} with shape" 

1347 f" {entity_shape}" 

1348 ), 

1349 } 

1350 ), 404 

1351 

1352 entity_form_fields = all_form_fields[tuple_key] 

1353 

1354 form_fields_array = [ 

1355 [prop, details_list] for prop, details_list in entity_form_fields.items() 

1356 ] 

1357 

1358 template_string = """ 

1359 {% from 'macros.jinja' import render_form_field with context %} 

1360 

1361 {% set entity_type = entity_class %} 

1362 {% set entity_shape = entity_shape %} 

1363 {% set group_id = ((entity_type, entity_shape) | human_readable_class + 

1364 "_group") | replace(" ", "_") %} 

1365 <div class="property-group mb-3" id="{{ group_id }}" data-uri="{{ entity_type 

1366 }}" data-shape="{{ entity_shape }}"> 

1367 {% for prop_data in ordered_form_fields %} 

1368 {% set prop = prop_data[0] %} 

1369 {% set details_list = prop_data[1] %} 

1370 {% for details in details_list %} 

1371 {{ render_form_field(entity_type, prop, details, all_form_fields) }} 

1372 {% endfor %} 

1373 {% endfor %} 

1374 </div> 

1375 """ 

1376 

1377 return render_template_string( 

1378 template_string, 

1379 entity_class=entity_class, 

1380 entity_shape=entity_shape, 

1381 ordered_form_fields=form_fields_array, 

1382 all_form_fields=all_form_fields, 

1383 ) 

1384 

1385 except Exception as e: 

1386 current_app.logger.exception("Error rendering form fields HTML") 

1387 

1388 return jsonify( 

1389 {"status": "error", "message": f"Failed to render form fields: {e!s}"} 

1390 ), 500 

1391 

1392 

1393def _validate_nested_form_request() -> ( 

1394 tuple[str, str, str, str, str, int, bool, dict] | tuple[Response, int] 

1395): 

1396 data = request.get_json() 

1397 

1398 required_fields = [ 

1399 "parent_entity_class", 

1400 "parent_entity_shape", 

1401 "entity_class", 

1402 "entity_shape", 

1403 "predicate_uri", 

1404 "depth", 

1405 ] 

1406 

1407 if not data: 

1408 return jsonify({"status": "error", "message": "No JSON data provided"}), 400 

1409 

1410 missing_fields = [field for field in required_fields if field not in data] 

1411 if missing_fields: 

1412 return jsonify( 

1413 { 

1414 "status": "error", 

1415 "message": f"Missing required fields: {', '.join(required_fields)}", 

1416 } 

1417 ), 400 

1418 

1419 parent_entity_class = data["parent_entity_class"] 

1420 parent_entity_shape = data["parent_entity_shape"] 

1421 entity_class = data["entity_class"] 

1422 entity_shape = data["entity_shape"] 

1423 predicate_uri = data["predicate_uri"] 

1424 depth = int(data["depth"]) 

1425 is_template = data.get("is_template", False) 

1426 

1427 all_form_fields = get_form_fields() 

1428 

1429 if not all_form_fields: 

1430 return jsonify( 

1431 {"status": "error", "message": "Form fields not initialized"} 

1432 ), 500 

1433 

1434 parent_entity_key = (parent_entity_class, parent_entity_shape) 

1435 if parent_entity_key not in all_form_fields: 

1436 return jsonify( 

1437 { 

1438 "status": "error", 

1439 "message": ( 

1440 "No form fields found for parent" 

1441 f" entity {parent_entity_class}" 

1442 f" with shape {parent_entity_shape}" 

1443 ), 

1444 } 

1445 ), 404 

1446 

1447 parent_fields = all_form_fields[parent_entity_key] 

1448 if predicate_uri not in parent_fields: 

1449 return jsonify( 

1450 { 

1451 "status": "error", 

1452 "message": ( 

1453 "No field definition found for" 

1454 f" predicate {predicate_uri}" 

1455 " in parent entity" 

1456 ), 

1457 } 

1458 ), 404 

1459 

1460 return ( 

1461 parent_entity_class, 

1462 parent_entity_shape, 

1463 entity_class, 

1464 entity_shape, 

1465 predicate_uri, 

1466 depth, 

1467 is_template, 

1468 all_form_fields, 

1469 ) 

1470 

1471 

1472@api_bp.route("/render-nested-form", methods=["POST"]) 

1473@login_required 

1474def render_nested_form_html() -> str | tuple[Response, int]: 

1475 try: 

1476 validated = _validate_nested_form_request() 

1477 if isinstance(validated[0], Response): 

1478 return validated # type: ignore[return-value] 

1479 

1480 ( 

1481 parent_entity_class, 

1482 parent_entity_shape, 

1483 entity_class, 

1484 entity_shape, 

1485 predicate_uri, 

1486 depth, 

1487 is_template, 

1488 all_form_fields, 

1489 ) = cast("tuple[str, str, str, str, str, int, bool, dict]", validated) 

1490 

1491 parent_entity_key = (parent_entity_class, parent_entity_shape) 

1492 parent_fields = all_form_fields[parent_entity_key] 

1493 field_details_list = parent_fields[predicate_uri] 

1494 

1495 target_details = None 

1496 for details in field_details_list: 

1497 if details.get("or"): 

1498 for shape_info in details["or"]: 

1499 if ( 

1500 shape_info.get("entityType") == entity_class 

1501 and shape_info.get("nodeShape") == entity_shape 

1502 ): 

1503 target_details = shape_info 

1504 break 

1505 if target_details: 

1506 break 

1507 

1508 if not target_details: 

1509 return jsonify( 

1510 { 

1511 "status": "error", 

1512 "message": ( 

1513 "No matching shape info found for" 

1514 f" {entity_class}/{entity_shape}" 

1515 f" in parent predicate" 

1516 f" {predicate_uri}" 

1517 ), 

1518 } 

1519 ), 404 

1520 

1521 template_string = """ 

1522 {% from 'macros.jinja' import render_form_field with context %} 

1523 {{ render_form_field(parent_entity_class, predicate_uri, shape_info, 

1524 all_form_fields, depth, is_template=is_template) }} 

1525 """ 

1526 

1527 return render_template_string( 

1528 template_string, 

1529 parent_entity_class=parent_entity_class, 

1530 predicate_uri=predicate_uri, 

1531 shape_info=target_details, 

1532 all_form_fields=all_form_fields, 

1533 depth=depth, 

1534 is_template=is_template, 

1535 ) 

1536 

1537 except Exception as e: 

1538 current_app.logger.exception("Error rendering nested form HTML") 

1539 

1540 return jsonify( 

1541 {"status": "error", "message": f"Failed to render nested form: {e!s}"} 

1542 ), 500