Coverage for heritrace / routes / api.py: 99%

503 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-21 12:56 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5# heritrace/routes/api.py 

6 

7import traceback 

8from typing import Dict, Optional 

9 

10import validators 

11from flask import (Blueprint, current_app, g, jsonify, render_template_string, 

12 request) 

13from flask_babel import gettext 

14from flask_login import current_user, login_required 

15from rdflib import RDF, XSD, Graph, Literal, URIRef 

16 

17from heritrace.apis.orcid import get_responsible_agent_uri 

18from heritrace.editor import Editor 

19from heritrace.extensions import (get_custom_filter, get_dataset_endpoint, 

20 get_form_fields, get_provenance_endpoint) 

21from heritrace.services.resource_lock_manager import LockStatus 

22from heritrace.utils.datatypes import DATATYPE_MAPPING 

23from heritrace.utils.primary_source_utils import \ 

24 save_user_default_primary_source 

25from heritrace.utils.shacl_utils import determine_shape_for_classes 

26from heritrace.utils.shacl_validation import validate_new_triple 

27from heritrace.utils.sparql_utils import (find_orphaned_entities, 

28 get_available_classes, 

29 get_catalog_data, 

30 get_deleted_entities_with_filtering, 

31 get_triples_from_graph, 

32 import_entity_graph, 

33 import_referenced_entities) 

34from heritrace.utils.strategies import (OrphanHandlingStrategy, 

35 ProxyHandlingStrategy) 

36from heritrace.utils.uri_utils import generate_unique_uri 

37from heritrace.utils.virtual_properties import \ 

38 transform_changes_with_virtual_properties 

39 

40api_bp = Blueprint("api", __name__) 

41 

42 

43@api_bp.route("/catalogue") 

44@login_required 

45def catalogue_api(): 

46 selected_class = request.args.get("class") 

47 selected_shape = request.args.get("shape") 

48 page = int(request.args.get("page", 1)) 

49 per_page = int(request.args.get("per_page", current_app.config["CATALOGUE_DEFAULT_PER_PAGE"])) 

50 sort_property = request.args.get("sort_property") 

51 sort_direction = request.args.get("sort_direction", "ASC") 

52 

53 allowed_per_page = current_app.config["CATALOGUE_ALLOWED_PER_PAGE"] 

54 if per_page not in allowed_per_page: 

55 per_page = current_app.config["CATALOGUE_DEFAULT_PER_PAGE"] 

56 

57 if not sort_property or sort_property.lower() == "null": 

58 sort_property = None 

59 

60 available_classes = get_available_classes() 

61 

62 catalog_data = get_catalog_data( 

63 selected_class=selected_class, 

64 page=page, 

65 per_page=per_page, 

66 sort_property=sort_property, 

67 sort_direction=sort_direction, 

68 selected_shape=selected_shape 

69 ) 

70 

71 catalog_data["available_classes"] = available_classes 

72 return jsonify(catalog_data) 

73 

74 

75@api_bp.route("/time-vault") 

76@login_required 

77def get_deleted_entities_api(): 

78 """ 

79 API endpoint to retrieve deleted entities with pagination and sorting. 

80 Only processes and returns entities whose classes are marked as visible. 

81 """ 

82 selected_class = request.args.get("class") 

83 selected_shape = request.args.get("shape") 

84 page = int(request.args.get("page", 1)) 

85 per_page = int(request.args.get("per_page", current_app.config["CATALOGUE_DEFAULT_PER_PAGE"])) 

86 sort_property = request.args.get("sort_property", "deletionTime") 

87 sort_direction = request.args.get("sort_direction", "DESC") 

88 

89 allowed_per_page = current_app.config["CATALOGUE_ALLOWED_PER_PAGE"] 

90 if per_page not in allowed_per_page: 

91 per_page = current_app.config["CATALOGUE_DEFAULT_PER_PAGE"] 

92 

93 deleted_entities, available_classes, selected_class, selected_shape, sortable_properties, total_count = ( 

94 get_deleted_entities_with_filtering( 

95 page, per_page, sort_property, sort_direction, selected_class, selected_shape 

96 ) 

97 ) 

98 

99 return jsonify( 

100 { 

101 "entities": deleted_entities, 

102 "total_pages": (total_count + per_page - 1) // per_page if total_count > 0 else 0, 

103 "current_page": page, 

104 "per_page": per_page, 

105 "total_count": total_count, 

106 "sort_property": sort_property, 

107 "sort_direction": sort_direction, 

108 "selected_class": selected_class, 

109 "selected_shape": selected_shape, 

110 "available_classes": available_classes, 

111 "sortable_properties": sortable_properties, 

112 } 

113 ) 

114 

115 

116@api_bp.route("/check-lock", methods=["POST"]) 

117@login_required 

118def check_lock(): 

119 """Check if a resource is locked.""" 

120 try: 

121 data = request.get_json() 

122 resource_uri = data.get("resource_uri") 

123 

124 if not resource_uri: 

125 return ( 

126 jsonify( 

127 {"status": "error", "message": gettext("No resource URI provided")} 

128 ), 

129 400, 

130 ) 

131 

132 status, lock_info = g.resource_lock_manager.check_lock_status(resource_uri) 

133 

134 if status == LockStatus.LOCKED: 

135 return jsonify( 

136 { 

137 "status": "locked", 

138 "title": gettext("Resource Locked"), 

139 "message": gettext( 

140 "This resource is currently being edited by %(user)s [%(orcid)s]", 

141 user=lock_info.user_name, 

142 orcid=lock_info.user_id, 

143 ), 

144 } 

145 ) 

146 elif status == LockStatus.ERROR: 

147 return ( 

148 jsonify( 

149 { 

150 "status": "error", 

151 "title": gettext("Error"), 

152 "message": gettext("An error occurred while checking the lock"), 

153 } 

154 ), 

155 500, 

156 ) 

157 else: 

158 return jsonify({"status": "available"}) 

159 

160 except Exception as e: 

161 current_app.logger.error(f"Error in check_lock: {str(e)}") 

162 return ( 

163 jsonify( 

164 { 

165 "status": "error", 

166 "title": gettext("Error"), 

167 "message": gettext("An unexpected error occurred"), 

168 } 

169 ), 

170 500, 

171 ) 

172 

173 

174@api_bp.route("/acquire-lock", methods=["POST"]) 

175@login_required 

176def acquire_lock(): 

177 """Try to acquire a lock on a resource.""" 

178 try: 

179 data = request.get_json() 

180 resource_uri = data.get("resource_uri") 

181 linked_resources = data.get("linked_resources", []) 

182 

183 if not resource_uri: 

184 return ( 

185 jsonify( 

186 {"status": "error", "message": gettext("No resource URI provided")} 

187 ), 

188 400, 

189 ) 

190 

191 # First check if the resource or any related resource is locked by another user 

192 status, lock_info = g.resource_lock_manager.check_lock_status(resource_uri) 

193 if status == LockStatus.LOCKED: 

194 return ( 

195 jsonify( 

196 { 

197 "status": "locked", 

198 "title": gettext("Resource Locked"), 

199 "message": gettext( 

200 "This resource is currently being edited by %(user)s [%(orcid)s]", 

201 user=lock_info.user_name, 

202 orcid=lock_info.user_id, 

203 ), 

204 } 

205 ), 

206 200, 

207 ) 

208 

209 # Use the provided linked_resources 

210 success = g.resource_lock_manager.acquire_lock(resource_uri, linked_resources) 

211 

212 if success: 

213 return jsonify({"status": "success"}) 

214 

215 return ( 

216 jsonify( 

217 { 

218 "status": "error", 

219 "message": gettext("Resource is locked by another user"), 

220 } 

221 ), 

222 423, 

223 ) 

224 

225 except Exception as e: 

226 current_app.logger.error(f"Error in acquire_lock: {str(e)}") 

227 return ( 

228 jsonify( 

229 {"status": "error", "message": gettext("An unexpected error occurred")} 

230 ), 

231 500, 

232 ) 

233 

234 

235@api_bp.route("/release-lock", methods=["POST"]) 

236@login_required 

237def release_lock(): 

238 """Release a lock on a resource.""" 

239 try: 

240 data = request.get_json() 

241 resource_uri = data.get("resource_uri") 

242 

243 if not resource_uri: 

244 return ( 

245 jsonify( 

246 {"status": "error", "message": gettext("No resource URI provided")} 

247 ), 

248 400, 

249 ) 

250 

251 success = g.resource_lock_manager.release_lock(resource_uri) 

252 

253 if success: 

254 return jsonify({"status": "success"}) 

255 

256 return ( 

257 jsonify({"status": "error", "message": gettext("Unable to release lock")}), 

258 400, 

259 ) 

260 

261 except Exception as e: 

262 current_app.logger.error(f"Error in release_lock: {str(e)}") 

263 return ( 

264 jsonify( 

265 {"status": "error", "message": gettext("An unexpected error occurred")} 

266 ), 

267 500, 

268 ) 

269 

270 

271@api_bp.route("/renew-lock", methods=["POST"]) 

272@login_required 

273def renew_lock(): 

274 """Renew an existing lock on a resource.""" 

275 try: 

276 data = request.get_json() 

277 resource_uri = data.get("resource_uri") 

278 

279 if not resource_uri: 

280 return ( 

281 jsonify( 

282 {"status": "error", "message": gettext("No resource URI provided")} 

283 ), 

284 400, 

285 ) 

286 

287 # When renewing a lock, we don't need to check for linked resources again 

288 # Just pass an empty list as we're only refreshing the existing lock 

289 success = g.resource_lock_manager.acquire_lock(resource_uri, []) 

290 

291 if success: 

292 return jsonify({"status": "success"}) 

293 

294 return ( 

295 jsonify({"status": "error", "message": gettext("Unable to renew lock")}), 

296 423, 

297 ) 

298 

299 except Exception as e: 

300 current_app.logger.error(f"Error in renew_lock: {str(e)}") 

301 return ( 

302 jsonify( 

303 {"status": "error", "message": gettext("An unexpected error occurred")} 

304 ), 

305 500, 

306 ) 

307 

308 

309@api_bp.route("/validate-literal", methods=["POST"]) 

310@login_required 

311def validate_literal(): 

312 """Validate a literal value and suggest appropriate datatypes.""" 

313 value = request.json.get("value") 

314 if not value: 

315 return jsonify({"error": gettext("Value is required.")}), 400 

316 

317 matching_datatypes = [] 

318 for datatype, validation_func, _ in DATATYPE_MAPPING: 

319 if validation_func(value): 

320 matching_datatypes.append(str(datatype)) 

321 

322 if not matching_datatypes: 

323 return jsonify({"error": gettext("No matching datatypes found.")}), 400 

324 

325 return jsonify({"valid_datatypes": matching_datatypes}), 200 

326 

327 

328@api_bp.route("/check_orphans", methods=["POST"]) 

329@login_required 

330def check_orphans(): 

331 """ 

332 Check for orphaned entities and intermediate relations (proxies) that would result from the requested changes. 

333 Applies separate handling strategies for orphans and proxies, but returns a unified report. 

334 """ 

335 try: 

336 # Get strategies from configuration 

337 orphan_strategy = current_app.config.get( 

338 "ORPHAN_HANDLING_STRATEGY", OrphanHandlingStrategy.KEEP 

339 ) 

340 proxy_strategy = current_app.config.get( 

341 "PROXY_HANDLING_STRATEGY", ProxyHandlingStrategy.KEEP 

342 ) 

343 

344 data = request.json 

345 # Validate required fields 

346 if not data or "changes" not in data or "entity_type" not in data: 

347 return ( 

348 jsonify( 

349 { 

350 "status": "error", 

351 "error_type": "validation", 

352 "message": gettext( 

353 "Invalid request: 'changes' and 'entity_type' are required fields" 

354 ), 

355 } 

356 ), 

357 400, 

358 ) 

359 

360 changes = data.get("changes", []) 

361 entity_type = data.get("entity_type") 

362 entity_shape = data.get("entity_shape") 

363 custom_filter = get_custom_filter() 

364 

365 orphans = [] 

366 intermediate_orphans = [] 

367 

368 # Check for orphans and proxies based on their respective strategies 

369 check_for_orphans = orphan_strategy in ( 

370 OrphanHandlingStrategy.DELETE, 

371 OrphanHandlingStrategy.ASK, 

372 ) 

373 check_for_proxies = proxy_strategy in ( 

374 ProxyHandlingStrategy.DELETE, 

375 ProxyHandlingStrategy.ASK, 

376 ) 

377 if check_for_orphans or check_for_proxies: 

378 for change in changes: 

379 if change["action"] == "delete": 

380 found_orphans, found_intermediates = find_orphaned_entities( 

381 change["subject"], 

382 entity_type, 

383 change.get("predicate"), 

384 change.get("object"), 

385 ) 

386 # Only collect orphans if we need to handle them 

387 if check_for_orphans: 

388 orphans.extend(found_orphans) 

389 

390 # Only collect proxies if we need to handle them 

391 if check_for_proxies: 

392 intermediate_orphans.extend(found_intermediates) 

393 

394 # If both strategies are KEEP or no entities found, return empty result 

395 if (orphan_strategy == OrphanHandlingStrategy.KEEP or not orphans) and ( 

396 proxy_strategy == ProxyHandlingStrategy.KEEP or not intermediate_orphans 

397 ): 

398 return jsonify({"status": "success", "affected_entities": []}) 

399 

400 # Format entities for display 

401 def format_entities(entities, is_intermediate=False): 

402 return [ 

403 { 

404 "uri": entity["uri"], 

405 "label": custom_filter.human_readable_entity( 

406 entity["uri"], (entity["type"], entity_shape) 

407 ), 

408 "type": custom_filter.human_readable_class( 

409 (entity["type"], entity_shape) 

410 ), 

411 "is_intermediate": is_intermediate, 

412 } 

413 for entity in entities 

414 ] 

415 

416 # Create a unified list of affected entities 

417 affected_entities = format_entities(orphans) + format_entities( 

418 intermediate_orphans, is_intermediate=True 

419 ) 

420 

421 # Determine if we should automatically delete entities 

422 should_delete_orphans = orphan_strategy == OrphanHandlingStrategy.DELETE 

423 should_delete_proxies = proxy_strategy == ProxyHandlingStrategy.DELETE 

424 

425 # If both strategies are DELETE, we can automatically delete everything 

426 if should_delete_orphans and should_delete_proxies: 

427 return jsonify( 

428 { 

429 "status": "success", 

430 "affected_entities": affected_entities, 

431 "should_delete": True, 

432 "orphan_strategy": orphan_strategy.value, 

433 "proxy_strategy": proxy_strategy.value, 

434 } 

435 ) 

436 

437 # If at least one strategy is ASK, we need to ask the user 

438 return jsonify( 

439 { 

440 "status": "success", 

441 "affected_entities": affected_entities, 

442 "should_delete": False, 

443 "orphan_strategy": orphan_strategy.value, 

444 "proxy_strategy": proxy_strategy.value, 

445 } 

446 ) 

447 except ValueError as e: 

448 # Handle validation errors specifically 

449 error_message = str(e) 

450 current_app.logger.warning( 

451 f"Validation error in check_orphans: {error_message}" 

452 ) 

453 return ( 

454 jsonify( 

455 { 

456 "status": "error", 

457 "error_type": "validation", 

458 "message": gettext( 

459 "An error occurred while checking for orphaned entities" 

460 ), 

461 } 

462 ), 

463 400, 

464 ) 

465 except Exception as e: 

466 # Handle other errors 

467 error_message = f"Error checking orphans: {str(e)}" 

468 current_app.logger.error(f"{error_message}\n{traceback.format_exc()}") 

469 return ( 

470 jsonify( 

471 { 

472 "status": "error", 

473 "error_type": "system", 

474 "message": gettext( 

475 "An error occurred while checking for orphaned entities" 

476 ), 

477 } 

478 ), 

479 500, 

480 ) 

481 

482 

483@api_bp.route("/apply_changes", methods=["POST"]) 

484@login_required 

485def apply_changes(): 

486 """Apply changes to entities. 

487 

488 Request body: 

489 { 

490 "subject": (str) Main entity URI being modified, 

491 "changes": (list) List of changes to apply, 

492 "primary_source": (str) Primary source to use for provenance, 

493 "save_default_source": (bool) Whether to save primary_source as default for current user, 

494 "affected_entities": (list) Entities potentially affected by delete operations, 

495 "delete_affected": (bool) Whether to delete affected entities 

496 } 

497 

498 Responses: 

499 200 OK: Changes applied successfully 

500 400 Bad Request: Invalid request or validation error 

501 500 Internal Server Error: Server error while applying changes 

502 """ 

503 try: 

504 changes = request.get_json() 

505 if not changes: 

506 return jsonify({"error": "No request data provided"}), 400 

507 

508 first_change = changes[0] if changes else {} 

509 subject = first_change.get("subject") 

510 affected_entities = first_change.get("affected_entities", []) 

511 delete_affected = first_change.get("delete_affected", False) 

512 primary_source = first_change.get("primary_source") 

513 save_default_source = first_change.get("save_default_source", False) 

514 

515 if primary_source and not validators.url(primary_source): 

516 return jsonify({"error": "Invalid primary source URL"}), 400 

517 

518 if save_default_source and primary_source and validators.url(primary_source): 

519 save_user_default_primary_source(current_user.orcid, primary_source) 

520 

521 changes = transform_changes_with_virtual_properties(changes) 

522 

523 deleted_entities = set() 

524 editor = Editor( 

525 get_dataset_endpoint(), 

526 get_provenance_endpoint(), 

527 current_app.config["COUNTER_HANDLER"], 

528 URIRef(get_responsible_agent_uri(current_user.orcid)), 

529 current_app.config["PRIMARY_SOURCE"], 

530 current_app.config["DATASET_GENERATION_TIME"], 

531 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

532 ) 

533 

534 if primary_source and validators.url(primary_source): 

535 editor.set_primary_source(primary_source) 

536 

537 has_entity_deletion = any( 

538 change["action"] == "delete" and not change.get("predicate") 

539 for change in changes 

540 ) 

541 

542 editor = import_entity_graph( 

543 editor, 

544 subject, 

545 include_referencing_entities=has_entity_deletion 

546 ) 

547 

548 for change in changes: 

549 if change["action"] == "create": 

550 data = change.get("data") 

551 if data: 

552 import_referenced_entities(editor, data) 

553 

554 editor.preexisting_finished() 

555 

556 graph_uri = None 

557 if editor.dataset_is_quadstore: 

558 for quad in editor.g_set.quads((URIRef(subject), None, None, None)): 

559 graph_context = quad[3] 

560 graph_uri = get_graph_uri_from_context(graph_context) 

561 break 

562 

563 temp_id_to_uri = {} 

564 for change in changes: 

565 if change["action"] == "create": 

566 data = change.get("data") 

567 if data: 

568 change_subject = change.get("subject") 

569 created_subject = create_logic( 

570 editor, 

571 data, 

572 change_subject, 

573 graph_uri, 

574 temp_id_to_uri=temp_id_to_uri, 

575 parent_entity_type=None, 

576 ) 

577 # Only update the main subject if this is the main entity 

578 if change_subject is not None: 

579 subject = created_subject 

580 

581 orphan_strategy = current_app.config.get( 

582 "ORPHAN_HANDLING_STRATEGY", OrphanHandlingStrategy.KEEP 

583 ) 

584 proxy_strategy = current_app.config.get( 

585 "PROXY_HANDLING_STRATEGY", ProxyHandlingStrategy.KEEP 

586 ) 

587 # Separiamo le operazioni di delete in due fasi: 

588 # 1. Prima eliminiamo tutte le entità orfane/intermedie 

589 # 2. Poi eliminiamo le triple specifiche 

590 

591 # Fase 1: Elimina le entità orfane/intermedie 

592 if affected_entities and delete_affected: 

593 # Separa gli orfani dalle entità proxy 

594 orphans = [entity for entity in affected_entities if not entity.get("is_intermediate")] 

595 proxies = [entity for entity in affected_entities if entity.get("is_intermediate")] 

596 

597 # Gestione degli orfani secondo la strategia per gli orfani 

598 should_delete_orphans = ( 

599 orphan_strategy == OrphanHandlingStrategy.DELETE 

600 or (orphan_strategy == OrphanHandlingStrategy.ASK and delete_affected) 

601 ) 

602 

603 if should_delete_orphans and orphans: 

604 for orphan in orphans: 

605 orphan_uri = orphan["uri"] 

606 if orphan_uri in deleted_entities: 

607 continue 

608 

609 delete_logic(editor, orphan_uri, graph_uri=graph_uri) 

610 deleted_entities.add(orphan_uri) 

611 

612 # Gestione delle entità proxy secondo la strategia per i proxy 

613 should_delete_proxies = ( 

614 proxy_strategy == ProxyHandlingStrategy.DELETE 

615 or (proxy_strategy == ProxyHandlingStrategy.ASK and delete_affected) 

616 ) 

617 

618 if should_delete_proxies and proxies: 

619 for proxy in proxies: 

620 proxy_uri = proxy["uri"] 

621 if proxy_uri in deleted_entities: 

622 continue 

623 

624 delete_logic(editor, proxy_uri, graph_uri=graph_uri) 

625 deleted_entities.add(proxy_uri) 

626 

627 # Fase 2: Processa tutte le altre modifiche 

628 for change in changes: 

629 if change["action"] == "delete": 

630 subject_uri = change["subject"] 

631 predicate = change.get("predicate") 

632 object_value = change.get("object") 

633 

634 # Se stiamo eliminando un'intera entità 

635 if not predicate: 

636 if subject_uri in deleted_entities: 

637 continue 

638 

639 delete_logic(editor, subject_uri, graph_uri=graph_uri, entity_type=change.get("entity_type"), entity_shape=change.get("entity_shape")) 

640 deleted_entities.add(subject_uri) 

641 # Se stiamo eliminando una tripla specifica 

642 elif object_value: 

643 # Controlla se l'oggetto è un'entità che è già stata eliminata 

644 if object_value in deleted_entities: 

645 continue 

646 

647 delete_logic(editor, subject_uri, predicate, object_value, graph_uri, change.get("entity_type"), change.get("entity_shape")) 

648 

649 # La gestione degli orfani e dei proxy è stata spostata all'inizio del ciclo 

650 

651 elif change["action"] == "update": 

652 update_logic( 

653 editor, 

654 change["subject"], 

655 change["predicate"], 

656 change["object"], 

657 change["newObject"], 

658 graph_uri, 

659 change.get("entity_type"), 

660 change.get("entity_shape"), 

661 ) 

662 elif change["action"] == "order": 

663 order_logic( 

664 editor, 

665 change["subject"], 

666 change["predicate"], 

667 change["object"], 

668 change["newObject"], 

669 graph_uri, 

670 temp_id_to_uri, 

671 ) 

672 

673 try: 

674 editor.save() 

675 except ValueError as ve: 

676 # Re-raise ValueError so it can be caught by the outer try-except block 

677 current_app.logger.error(f"Error during save operation: {str(ve)}") 

678 raise 

679 except Exception as save_error: 

680 current_app.logger.error(f"Error during save operation: {str(save_error)}") 

681 return jsonify( 

682 { 

683 "status": "error", 

684 "error_type": "database", 

685 "message": gettext("Failed to save changes to the database: {}").format(str(save_error)), 

686 } 

687 ), 500 

688 

689 return ( 

690 jsonify( 

691 { 

692 "status": "success", 

693 "message": gettext("Changes applied successfully"), 

694 } 

695 ), 

696 200, 

697 ) 

698 

699 except ValueError as e: 

700 # Handle validation errors specifically 

701 error_message = str(e) 

702 current_app.logger.warning(f"Validation error: {error_message}") 

703 return ( 

704 jsonify( 

705 { 

706 "status": "error", 

707 "error_type": "validation", 

708 "message": error_message, 

709 } 

710 ), 

711 400, 

712 ) 

713 except Exception as e: 

714 # Handle other errors 

715 error_message = ( 

716 f"Error while applying changes: {str(e)}\n{traceback.format_exc()}" 

717 ) 

718 current_app.logger.error(error_message) 

719 return ( 

720 jsonify( 

721 { 

722 "status": "error", 

723 "error_type": "system", 

724 "message": gettext("An error occurred while applying changes"), 

725 } 

726 ), 

727 500, 

728 ) 

729 

730 

731def get_graph_uri_from_context(graph_context): 

732 """Extract the graph URI from a graph context. 

733  

734 Args: 

735 graph_context: Either a Graph object or a direct URI reference 

736  

737 Returns: 

738 The graph URI 

739 """ 

740 if isinstance(graph_context, Graph): 

741 return graph_context.identifier 

742 else: 

743 return graph_context 

744 

745 

746def determine_datatype(value, datatype_uris): 

747 for datatype_uri in datatype_uris: 

748 validation_func = next( 

749 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None 

750 ) 

751 if validation_func and validation_func(value): 

752 return URIRef(datatype_uri) 

753 # If none match, default to XSD.string 

754 return XSD.string 

755 

756 

757def create_logic( 

758 editor: Editor, 

759 data: Dict[str, dict], 

760 subject=None, 

761 graph_uri=None, 

762 parent_subject=None, 

763 parent_predicate=None, 

764 temp_id_to_uri=None, 

765 parent_entity_type=None, 

766): 

767 """ 

768 Recursively creates an entity and its properties based on a dictionary. 

769 

770 This function handles the creation of a main entity and any nested entities 

771 defined within its properties. It validates each triple before creation and 

772 can link the new entity to a parent entity. 

773 

774 Args: 

775 editor (Editor): The editor instance for graph operations. 

776 data (Dict[str, dict]): A dictionary describing the entity to create. 

777 subject (URIRef, optional): The subject URI of the entity. If None, a new URI is generated. 

778 graph_uri (str, optional): The named graph URI for the operations. 

779 parent_subject (URIRef, optional): The subject URI of the parent entity. 

780 parent_predicate (URIRef, optional): The predicate URI linking the parent to this entity. 

781 temp_id_to_uri (Dict, optional): A dictionary mapping temporary frontend IDs to backend URIs. 

782 parent_entity_type (str, optional): The RDF type of the parent entity, for validation. 

783 

784 Example of `data` structure: 

785 { 

786 "entity_type": "http://purl.org/spar/fabio/JournalArticle", 

787 "properties": { 

788 "http://purl.org/spar/pro/isDocumentContextFor": [ 

789 { 

790 "entity_type": "http://purl.org/spar/pro/RoleInTime", 

791 "properties": { 

792 "http://purl.org/spar/pro/isHeldBy": [ 

793 "https://w3id.org/oc/meta/ra/09110374" 

794 ], 

795 "http://purl.org/spar/pro/withRole": "http://purl.org/spar/pro/author" 

796 }, 

797 "tempId": "temp-1" 

798 } 

799 ] 

800 } 

801 } 

802 """ 

803 entity_type = data.get("entity_type") 

804 properties = data.get("properties", {}) 

805 temp_id = data.get("tempId") 

806 

807 if subject is None: 

808 subject = generate_unique_uri(entity_type, data) 

809 

810 if temp_id and temp_id_to_uri is not None: 

811 temp_id_to_uri[temp_id] = str(subject) 

812 

813 # Create the entity type using validate_new_triple 

814 if parent_subject is not None: 

815 type_value, _, error_message = validate_new_triple( 

816 subject, RDF.type, entity_type, "create", entity_types=entity_type 

817 ) 

818 if error_message: 

819 raise ValueError(error_message) 

820 

821 if type_value is not None: 

822 editor.create(URIRef(subject), RDF.type, type_value, graph_uri) 

823 

824 # Create the relationship to the parent using validate_new_triple 

825 if parent_subject and parent_predicate: 

826 # When creating a relationship, we need to validate that the parent can have this relationship 

827 # with an entity of our type. Pass our entity_type as the object_entity_type for validation 

828 parent_value, _, error_message = validate_new_triple( 

829 parent_subject, 

830 parent_predicate, 

831 subject, 

832 "create", 

833 entity_types=parent_entity_type, 

834 ) 

835 if error_message: 

836 raise ValueError(error_message) 

837 

838 if parent_value is not None: 

839 editor.create( 

840 URIRef(parent_subject), 

841 URIRef(parent_predicate), 

842 parent_value, 

843 graph_uri, 

844 ) 

845 

846 for predicate, values in properties.items(): 

847 if not isinstance(values, list): 

848 values = [values] 

849 for value in values: 

850 # CASE 1: Nested Entity. 

851 # If the value is a dictionary containing 'entity_type', it's a nested entity 

852 # that needs to be created recursively. 

853 if isinstance(value, dict) and "entity_type" in value: 

854 # A new URI is generated for the nested entity. 

855 nested_subject = generate_unique_uri(value["entity_type"]) 

856 # The function calls itself to create the nested entity. The current entity 

857 # becomes the parent for the nested one. 

858 create_logic( 

859 editor, 

860 value, 

861 nested_subject, 

862 graph_uri, 

863 subject, # Current entity is the parent subject 

864 predicate, # The predicate linking parent to child 

865 temp_id_to_uri, 

866 parent_entity_type=entity_type, 

867 ) 

868 # CASE 2: Existing Entity Reference. 

869 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

870 entity_uri = value.get("entity_uri") 

871 if entity_uri: 

872 object_value = URIRef(entity_uri) 

873 editor.create( 

874 URIRef(subject), URIRef(predicate), object_value, graph_uri 

875 ) 

876 else: 

877 raise ValueError("Missing entity_uri in existing entity reference") 

878 # CASE 3: Custom Property. 

879 # If the value is a dictionary marked as 'is_custom_property', it's a property 

880 # that is not defined in the SHACL shape and should be added without validation. 

881 elif isinstance(value, dict) and value.get("is_custom_property", False): 

882 # The property is created directly based on its type ('uri' or 'literal'). 

883 if value["type"] == "uri": 

884 object_value = URIRef(value["value"]) 

885 elif value["type"] == "literal": 

886 datatype = ( 

887 URIRef(value["datatype"]) 

888 if "datatype" in value 

889 else XSD.string 

890 ) 

891 object_value = Literal(value["value"], datatype=datatype) 

892 else: 

893 raise ValueError(f"Unknown custom property type: {value['type']}") 

894 

895 editor.create( 

896 URIRef(subject), URIRef(predicate), object_value, graph_uri 

897 ) 

898 # CASE 4: Standard Property. 

899 # This is the default case for all other properties. The value can be a 

900 # simple literal (e.g., a string, number) or a URI string. 

901 else: 

902 # The value is validated against the SHACL shape for the current entity type. 

903 # `validate_new_triple` checks if the triple is valid and returns the 

904 # correctly typed RDF object (e.g., Literal, URIRef). 

905 object_value, _, error_message = validate_new_triple( 

906 subject, predicate, value, "create", entity_types=entity_type 

907 ) 

908 if error_message: 

909 raise ValueError(error_message) 

910 

911 if object_value is not None: 

912 editor.create( 

913 URIRef(subject), URIRef(predicate), object_value, graph_uri 

914 ) 

915 

916 return subject 

917 

918 

919def update_logic( 

920 editor: Editor, 

921 subject, 

922 predicate, 

923 old_value, 

924 new_value, 

925 graph_uri=None, 

926 entity_type=None, 

927 entity_shape=None, 

928): 

929 new_value, old_value, error_message = validate_new_triple( 

930 subject, predicate, new_value, "update", old_value, entity_types=entity_type, entity_shape=entity_shape 

931 ) 

932 if error_message: 

933 raise ValueError(error_message) 

934 

935 editor.update(URIRef(subject), URIRef(predicate), old_value, new_value, graph_uri) 

936 

937 

938def rebuild_entity_order( 

939 editor: Editor, 

940 ordered_by_uri: URIRef, 

941 entities: list, 

942 graph_uri=None 

943): 

944 """ 

945 Rebuild the ordering chain for a list of entities. 

946  

947 Args: 

948 editor: The editor instance 

949 ordered_by_uri: The property used for ordering 

950 entities: List of entities to be ordered 

951 graph_uri: Optional graph URI 

952 """ 

953 # First, remove all existing ordering relationships 

954 for entity in entities: 

955 for s, p, o in list(get_triples_from_graph(editor.g_set, (entity, ordered_by_uri, None))): 

956 editor.delete(entity, ordered_by_uri, o, graph_uri) 

957 

958 # Then rebuild the chain with the entities 

959 for i in range(len(entities) - 1): 

960 current_entity = entities[i] 

961 next_entity = entities[i + 1] 

962 editor.create(current_entity, ordered_by_uri, next_entity, graph_uri) 

963 

964 return editor 

965 

966 

967def delete_logic( 

968 editor: Editor, 

969 subject, 

970 predicate=None, 

971 object_value=None, 

972 graph_uri=None, 

973 entity_type=None, 

974 entity_shape=None, 

975): 

976 # Ensure we have the correct data types for all values 

977 subject_uri = URIRef(subject) 

978 predicate_uri = URIRef(predicate) if predicate else None 

979 

980 # Validate and get correctly typed object value if we have a predicate 

981 if predicate and object_value: 

982 # Use validate_new_triple to validate the deletion and get the correctly typed object 

983 _, object_value, error_message = validate_new_triple( 

984 subject, predicate, None, "delete", object_value, entity_types=entity_type, entity_shape=entity_shape 

985 ) 

986 if error_message: 

987 raise ValueError(error_message) 

988 

989 editor.delete(subject_uri, predicate_uri, object_value, graph_uri) 

990 

991 

992def order_logic( 

993 editor: Editor, 

994 subject, 

995 predicate, 

996 new_order, 

997 ordered_by, 

998 graph_uri=None, 

999 temp_id_to_uri: Optional[Dict] = None, 

1000): 

1001 subject_uri = URIRef(subject) 

1002 predicate_uri = URIRef(predicate) 

1003 ordered_by_uri = URIRef(ordered_by) 

1004 # Ottieni tutte le entità ordinate attuali direttamente dall'editor 

1005 current_entities = [ 

1006 o for _, _, o in get_triples_from_graph(editor.g_set, (subject_uri, predicate_uri, None)) 

1007 ] 

1008 

1009 # Dizionario per mappare le vecchie entità alle nuove 

1010 old_to_new_mapping = {} 

1011 

1012 # Per ogni entità attuale 

1013 for old_entity in current_entities: 

1014 if str(old_entity) in new_order: # Processa solo le entità preesistenti 

1015 # Memorizza tutte le proprietà dell'entità attuale 

1016 entity_properties = list(get_triples_from_graph(editor.g_set, (old_entity, None, None))) 

1017 

1018 entity_type = next( 

1019 (o for _, p, o in entity_properties if p == RDF.type), None 

1020 ) 

1021 

1022 if entity_type is None: 

1023 raise ValueError( 

1024 f"Impossibile determinare il tipo dell'entità per {old_entity}" 

1025 ) 

1026 

1027 # Crea una nuova entità 

1028 new_entity_uri = generate_unique_uri(entity_type) 

1029 old_to_new_mapping[old_entity] = new_entity_uri 

1030 

1031 # Cancella la vecchia entità 

1032 editor.delete(subject_uri, predicate_uri, old_entity, graph_uri) 

1033 editor.delete(old_entity, graph=graph_uri) 

1034 

1035 # Ricrea il collegamento tra il soggetto principale e la nuova entità 

1036 editor.create(subject_uri, predicate_uri, new_entity_uri, graph_uri) 

1037 

1038 # Ripristina tutte le altre proprietà per la nuova entità 

1039 for _, p, o in entity_properties: 

1040 if p != predicate_uri and p != ordered_by_uri: 

1041 editor.create(new_entity_uri, p, o, graph_uri) 

1042 

1043 # Prepara la lista delle entità nel nuovo ordine 

1044 ordered_entities = [] 

1045 for entity in new_order: 

1046 new_entity_uri = old_to_new_mapping.get(URIRef(entity)) 

1047 if not new_entity_uri: 

1048 new_entity_uri = URIRef(temp_id_to_uri.get(entity, entity)) 

1049 ordered_entities.append(new_entity_uri) 

1050 

1051 # Ricostruisci l'ordine 

1052 if ordered_entities: 

1053 rebuild_entity_order(editor, ordered_by_uri, ordered_entities, graph_uri) 

1054 

1055 return editor 

1056 

1057 

1058@api_bp.route("/human-readable-entity", methods=["POST"]) 

1059@login_required 

1060def get_human_readable_entity(): 

1061 custom_filter = get_custom_filter() 

1062 

1063 # Check if required parameters are present 

1064 if "uri" not in request.form or "entity_class" not in request.form: 

1065 return jsonify({"status": "error", "message": "Missing required parameters"}), 400 

1066 

1067 uri = request.form["uri"] 

1068 entity_class = request.form["entity_class"] 

1069 shape = determine_shape_for_classes([entity_class]) 

1070 filter_instance = custom_filter 

1071 readable = filter_instance.human_readable_entity(uri, (entity_class, shape)) 

1072 return readable 

1073 

1074 

1075@api_bp.route('/format-source', methods=['POST']) 

1076@login_required 

1077def format_source_api(): 

1078 """ 

1079 API endpoint to format a source URL using the application's filters. 

1080 Accepts POST request with JSON body: {"url": "source_url"} 

1081 Returns JSON: {"formatted_html": "html_string"} 

1082 """ 

1083 data = request.get_json() 

1084 source_url = data.get('url') 

1085 

1086 if not source_url or not validators.url(source_url): 

1087 return jsonify({"error": gettext("Invalid or missing URL")}), 400 

1088 

1089 try: 

1090 custom_filter = get_custom_filter() 

1091 formatted_html = custom_filter.format_source_reference(source_url) 

1092 return jsonify({"formatted_html": formatted_html}) 

1093 except Exception as e: 

1094 current_app.logger.error(f"Error formatting source URL '{source_url}': {e}") 

1095 fallback_html = f'<a href="{source_url}" target="_blank">{source_url}</a>' 

1096 return jsonify({"formatted_html": fallback_html}) 

1097 

1098 

1099@api_bp.route("/form-fields", methods=["GET"]) 

1100@login_required 

1101def get_form_fields_for_entity(): 

1102 """ 

1103 Get form_fields for a specific entity class and shape combination. 

1104 Returns only the requested entity + immediate sub-entities (depth=2) to improve performance. 

1105 

1106 Query parameters: 

1107 entity_class: URI of the entity class 

1108 entity_shape: URI of the entity shape 

1109 

1110 Returns: 

1111 JSON response with form_fields for the specified entity 

1112 """ 

1113 

1114 try: 

1115 entity_class_decoded = request.args.get('entity_class') 

1116 entity_shape_decoded = request.args.get('entity_shape') 

1117 

1118 if not entity_class_decoded or not entity_shape_decoded: 

1119 return jsonify({ 

1120 'status': 'error', 

1121 'message': 'Missing required parameters: entity_class and entity_shape' 

1122 }), 400 

1123 

1124 all_form_fields = get_form_fields() 

1125 

1126 if not all_form_fields: 

1127 return jsonify({ 

1128 'status': 'error', 

1129 'message': 'Form fields not initialized' 

1130 }), 500 

1131 

1132 entity_key = (entity_class_decoded, entity_shape_decoded) 

1133 

1134 if entity_key not in all_form_fields: 

1135 return jsonify({ 

1136 'status': 'error', 

1137 'message': f'No form fields found for entity class {entity_class_decoded} with shape {entity_shape_decoded}' 

1138 }), 404 

1139 

1140 entity_form_fields = all_form_fields[entity_key] 

1141 

1142 # Convert OrderedDict to list of [property, details] pairs to preserve order 

1143 ordered_properties = [] 

1144 for prop, details_list in entity_form_fields.items(): 

1145 ordered_properties.append([prop, details_list]) 

1146 

1147 return jsonify({ 

1148 'status': 'success', 

1149 'form_fields': ordered_properties, 

1150 'entity_key': [entity_class_decoded, entity_shape_decoded] 

1151 }) 

1152 

1153 except Exception as e: 

1154 current_app.logger.error(f"Error loading form fields for {entity_class_decoded}/{entity_shape_decoded}: {e}") 

1155 current_app.logger.error(f"Full traceback: {traceback.format_exc()}") 

1156 

1157 return jsonify({ 

1158 'status': 'error', 

1159 'message': f'Failed to load form fields: {str(e)}' 

1160 }), 500 

1161 

1162 

1163@api_bp.route("/render-form-fields", methods=["POST"]) 

1164@login_required 

1165def render_form_fields_html(): 

1166 """ 

1167 Render form fields as HTML for dynamic loading. 

1168 

1169 Expects JSON payload with: 

1170 - entity_key: [entity_class, entity_shape] array 

1171 

1172 Returns: 

1173 HTML string of the rendered form fields 

1174 """ 

1175 try: 

1176 data = request.get_json() 

1177 

1178 if not data or 'entity_key' not in data: 

1179 return jsonify({ 

1180 'status': 'error', 

1181 'message': 'Missing required field: entity_key' 

1182 }), 400 

1183 

1184 entity_key = data['entity_key'] # This is [entity_class, entity_shape] array 

1185 entity_class, entity_shape = entity_key 

1186 

1187 all_form_fields = get_form_fields() 

1188 

1189 if not all_form_fields: 

1190 return jsonify({ 

1191 'status': 'error', 

1192 'message': 'Form fields not initialized' 

1193 }), 500 

1194 

1195 tuple_key = (entity_class, entity_shape) 

1196 if tuple_key not in all_form_fields: 

1197 return jsonify({ 

1198 'status': 'error', 

1199 'message': f'No form fields found for entity {entity_class} with shape {entity_shape}' 

1200 }), 404 

1201 

1202 entity_form_fields = all_form_fields[tuple_key] 

1203 

1204 form_fields_array = [[prop, details_list] for prop, details_list in entity_form_fields.items()] 

1205 

1206 template_string = ''' 

1207 {% from 'macros.jinja' import render_form_field with context %} 

1208 

1209 {% set entity_type = entity_class %} 

1210 {% set entity_shape = entity_shape %} 

1211 {% set group_id = ((entity_type, entity_shape) | human_readable_class + "_group") | replace(" ", "_") %} 

1212 <div class="property-group mb-3" id="{{ group_id }}" data-uri="{{ entity_type }}" data-shape="{{ entity_shape }}"> 

1213 {% for prop_data in ordered_form_fields %} 

1214 {% set prop = prop_data[0] %} 

1215 {% set details_list = prop_data[1] %} 

1216 {% for details in details_list %} 

1217 {{ render_form_field(entity_type, prop, details, all_form_fields) }} 

1218 {% endfor %} 

1219 {% endfor %} 

1220 </div> 

1221 ''' 

1222 

1223 html = render_template_string( 

1224 template_string, 

1225 entity_class=entity_class, 

1226 entity_shape=entity_shape, 

1227 ordered_form_fields=form_fields_array, 

1228 all_form_fields=all_form_fields 

1229 ) 

1230 

1231 return html 

1232 

1233 except Exception as e: 

1234 current_app.logger.error(f"Error rendering form fields HTML: {e}") 

1235 current_app.logger.error(f"Full traceback: {traceback.format_exc()}") 

1236 

1237 return jsonify({ 

1238 'status': 'error', 

1239 'message': f'Failed to render form fields: {str(e)}' 

1240 }), 500 

1241 

1242 

1243@api_bp.route("/render-nested-form", methods=["POST"]) 

1244@login_required 

1245def render_nested_form_html(): 

1246 """ 

1247 Render a single nested form for lazy loading in sh:or contexts. 

1248 

1249 Expects JSON payload with: 

1250 - parent_entity_class: The parent entity class URI 

1251 - parent_entity_shape: The parent entity shape URI 

1252 - entity_class: The sub-entity class URI to render 

1253 - entity_shape: The sub-entity shape URI to render 

1254 - predicate_uri: The predicate URI linking parent to sub-entity 

1255 - depth: The nesting depth 

1256 - is_template: Whether this is a template item 

1257 

1258 Returns: 

1259 HTML string of the rendered nested form 

1260 """ 

1261 try: 

1262 data = request.get_json() 

1263 

1264 required_fields = ['parent_entity_class', 'parent_entity_shape', 'entity_class', 'entity_shape', 'predicate_uri', 'depth'] 

1265 

1266 if not data: 

1267 return jsonify({ 

1268 'status': 'error', 

1269 'message': 'No JSON data provided' 

1270 }), 400 

1271 

1272 missing_fields = [field for field in required_fields if field not in data] 

1273 if missing_fields: 

1274 return jsonify({ 

1275 'status': 'error', 

1276 'message': f'Missing required fields: {", ".join(required_fields)}' 

1277 }), 400 

1278 

1279 parent_entity_class = data['parent_entity_class'] 

1280 parent_entity_shape = data['parent_entity_shape'] 

1281 entity_class = data['entity_class'] 

1282 entity_shape = data['entity_shape'] 

1283 predicate_uri = data['predicate_uri'] 

1284 depth = int(data['depth']) 

1285 is_template = data.get('is_template', False) 

1286 

1287 all_form_fields = get_form_fields() 

1288 

1289 if not all_form_fields: 

1290 return jsonify({ 

1291 'status': 'error', 

1292 'message': 'Form fields not initialized' 

1293 }), 500 

1294 

1295 parent_entity_key = (parent_entity_class, parent_entity_shape) 

1296 if parent_entity_key not in all_form_fields: 

1297 return jsonify({ 

1298 'status': 'error', 

1299 'message': f'No form fields found for parent entity {parent_entity_class} with shape {parent_entity_shape}' 

1300 }), 404 

1301 

1302 parent_fields = all_form_fields[parent_entity_key] 

1303 if predicate_uri not in parent_fields: 

1304 return jsonify({ 

1305 'status': 'error', 

1306 'message': f'No field definition found for predicate {predicate_uri} in parent entity' 

1307 }), 404 

1308 

1309 field_details_list = parent_fields[predicate_uri] 

1310 

1311 target_details = None 

1312 for details in field_details_list: 

1313 if details.get('or'): 

1314 for shape_info in details['or']: 

1315 if (shape_info.get('entityType') == entity_class and 

1316 shape_info.get('nodeShape') == entity_shape): 

1317 target_details = shape_info 

1318 break 

1319 if target_details: 

1320 break 

1321 

1322 if not target_details: 

1323 return jsonify({ 

1324 'status': 'error', 

1325 'message': f'No matching shape info found for {entity_class}/{entity_shape} in parent predicate {predicate_uri}' 

1326 }), 404 

1327 

1328 template_string = ''' 

1329 {% from 'macros.jinja' import render_form_field with context %} 

1330 {{ render_form_field(parent_entity_class, predicate_uri, shape_info, all_form_fields, depth, is_template=is_template) }} 

1331 ''' 

1332 

1333 html = render_template_string( 

1334 template_string, 

1335 parent_entity_class=parent_entity_class, 

1336 predicate_uri=predicate_uri, 

1337 shape_info=target_details, 

1338 all_form_fields=all_form_fields, 

1339 depth=depth, 

1340 is_template=is_template 

1341 ) 

1342 

1343 return html 

1344 

1345 except Exception as e: 

1346 current_app.logger.error(f"Error rendering nested form HTML: {e}") 

1347 current_app.logger.error(f"Full traceback: {traceback.format_exc()}") 

1348 

1349 return jsonify({ 

1350 'status': 'error', 

1351 'message': f'Failed to render nested form: {str(e)}' 

1352 }), 500