Coverage for heritrace/routes/api.py: 99%

503 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-11-26 11:33 +0000

1# heritrace/routes/api.py 

2 

3import traceback 

4from typing import Dict, Optional 

5 

6import validators 

7from flask import (Blueprint, current_app, g, jsonify, render_template_string, 

8 request) 

9from flask_babel import gettext 

10from flask_login import current_user, login_required 

11from rdflib import RDF, XSD, Graph, Literal, URIRef 

12 

13from heritrace.apis.orcid import get_responsible_agent_uri 

14from heritrace.editor import Editor 

15from heritrace.extensions import (get_custom_filter, get_dataset_endpoint, 

16 get_form_fields, get_provenance_endpoint) 

17from heritrace.services.resource_lock_manager import LockStatus 

18from heritrace.utils.datatypes import DATATYPE_MAPPING 

19from heritrace.utils.primary_source_utils import \ 

20 save_user_default_primary_source 

21from heritrace.utils.shacl_utils import determine_shape_for_classes 

22from heritrace.utils.shacl_validation import validate_new_triple 

23from heritrace.utils.sparql_utils import (find_orphaned_entities, 

24 get_available_classes, 

25 get_catalog_data, 

26 get_deleted_entities_with_filtering, 

27 get_triples_from_graph, 

28 import_entity_graph, 

29 import_referenced_entities) 

30from heritrace.utils.strategies import (OrphanHandlingStrategy, 

31 ProxyHandlingStrategy) 

32from heritrace.utils.uri_utils import generate_unique_uri 

33from heritrace.utils.virtual_properties import \ 

34 transform_changes_with_virtual_properties 

35 

36api_bp = Blueprint("api", __name__) 

37 

38 

39@api_bp.route("/catalogue") 

40@login_required 

41def catalogue_api(): 

42 selected_class = request.args.get("class") 

43 selected_shape = request.args.get("shape") 

44 page = int(request.args.get("page", 1)) 

45 per_page = int(request.args.get("per_page", current_app.config["CATALOGUE_DEFAULT_PER_PAGE"])) 

46 sort_property = request.args.get("sort_property") 

47 sort_direction = request.args.get("sort_direction", "ASC") 

48 

49 allowed_per_page = current_app.config["CATALOGUE_ALLOWED_PER_PAGE"] 

50 if per_page not in allowed_per_page: 

51 per_page = current_app.config["CATALOGUE_DEFAULT_PER_PAGE"] 

52 

53 if not sort_property or sort_property.lower() == "null": 

54 sort_property = None 

55 

56 available_classes = get_available_classes() 

57 

58 catalog_data = get_catalog_data( 

59 selected_class=selected_class, 

60 page=page, 

61 per_page=per_page, 

62 sort_property=sort_property, 

63 sort_direction=sort_direction, 

64 selected_shape=selected_shape 

65 ) 

66 

67 catalog_data["available_classes"] = available_classes 

68 return jsonify(catalog_data) 

69 

70 

71@api_bp.route("/time-vault") 

72@login_required 

73def get_deleted_entities_api(): 

74 """ 

75 API endpoint to retrieve deleted entities with pagination and sorting. 

76 Only processes and returns entities whose classes are marked as visible. 

77 """ 

78 selected_class = request.args.get("class") 

79 selected_shape = request.args.get("shape") 

80 page = int(request.args.get("page", 1)) 

81 per_page = int(request.args.get("per_page", current_app.config["CATALOGUE_DEFAULT_PER_PAGE"])) 

82 sort_property = request.args.get("sort_property", "deletionTime") 

83 sort_direction = request.args.get("sort_direction", "DESC") 

84 

85 allowed_per_page = current_app.config["CATALOGUE_ALLOWED_PER_PAGE"] 

86 if per_page not in allowed_per_page: 

87 per_page = current_app.config["CATALOGUE_DEFAULT_PER_PAGE"] 

88 

89 deleted_entities, available_classes, selected_class, selected_shape, sortable_properties, total_count = ( 

90 get_deleted_entities_with_filtering( 

91 page, per_page, sort_property, sort_direction, selected_class, selected_shape 

92 ) 

93 ) 

94 

95 return jsonify( 

96 { 

97 "entities": deleted_entities, 

98 "total_pages": (total_count + per_page - 1) // per_page if total_count > 0 else 0, 

99 "current_page": page, 

100 "per_page": per_page, 

101 "total_count": total_count, 

102 "sort_property": sort_property, 

103 "sort_direction": sort_direction, 

104 "selected_class": selected_class, 

105 "selected_shape": selected_shape, 

106 "available_classes": available_classes, 

107 "sortable_properties": sortable_properties, 

108 } 

109 ) 

110 

111 

112@api_bp.route("/check-lock", methods=["POST"]) 

113@login_required 

114def check_lock(): 

115 """Check if a resource is locked.""" 

116 try: 

117 data = request.get_json() 

118 resource_uri = data.get("resource_uri") 

119 

120 if not resource_uri: 

121 return ( 

122 jsonify( 

123 {"status": "error", "message": gettext("No resource URI provided")} 

124 ), 

125 400, 

126 ) 

127 

128 status, lock_info = g.resource_lock_manager.check_lock_status(resource_uri) 

129 

130 if status == LockStatus.LOCKED: 

131 return jsonify( 

132 { 

133 "status": "locked", 

134 "title": gettext("Resource Locked"), 

135 "message": gettext( 

136 "This resource is currently being edited by %(user)s [%(orcid)s]", 

137 user=lock_info.user_name, 

138 orcid=lock_info.user_id, 

139 ), 

140 } 

141 ) 

142 elif status == LockStatus.ERROR: 

143 return ( 

144 jsonify( 

145 { 

146 "status": "error", 

147 "title": gettext("Error"), 

148 "message": gettext("An error occurred while checking the lock"), 

149 } 

150 ), 

151 500, 

152 ) 

153 else: 

154 return jsonify({"status": "available"}) 

155 

156 except Exception as e: 

157 current_app.logger.error(f"Error in check_lock: {str(e)}") 

158 return ( 

159 jsonify( 

160 { 

161 "status": "error", 

162 "title": gettext("Error"), 

163 "message": gettext("An unexpected error occurred"), 

164 } 

165 ), 

166 500, 

167 ) 

168 

169 

170@api_bp.route("/acquire-lock", methods=["POST"]) 

171@login_required 

172def acquire_lock(): 

173 """Try to acquire a lock on a resource.""" 

174 try: 

175 data = request.get_json() 

176 resource_uri = data.get("resource_uri") 

177 linked_resources = data.get("linked_resources", []) 

178 

179 if not resource_uri: 

180 return ( 

181 jsonify( 

182 {"status": "error", "message": gettext("No resource URI provided")} 

183 ), 

184 400, 

185 ) 

186 

187 # First check if the resource or any related resource is locked by another user 

188 status, lock_info = g.resource_lock_manager.check_lock_status(resource_uri) 

189 if status == LockStatus.LOCKED: 

190 return ( 

191 jsonify( 

192 { 

193 "status": "locked", 

194 "title": gettext("Resource Locked"), 

195 "message": gettext( 

196 "This resource is currently being edited by %(user)s [%(orcid)s]", 

197 user=lock_info.user_name, 

198 orcid=lock_info.user_id, 

199 ), 

200 } 

201 ), 

202 200, 

203 ) 

204 

205 # Use the provided linked_resources 

206 success = g.resource_lock_manager.acquire_lock(resource_uri, linked_resources) 

207 

208 if success: 

209 return jsonify({"status": "success"}) 

210 

211 return ( 

212 jsonify( 

213 { 

214 "status": "error", 

215 "message": gettext("Resource is locked by another user"), 

216 } 

217 ), 

218 423, 

219 ) 

220 

221 except Exception as e: 

222 current_app.logger.error(f"Error in acquire_lock: {str(e)}") 

223 return ( 

224 jsonify( 

225 {"status": "error", "message": gettext("An unexpected error occurred")} 

226 ), 

227 500, 

228 ) 

229 

230 

231@api_bp.route("/release-lock", methods=["POST"]) 

232@login_required 

233def release_lock(): 

234 """Release a lock on a resource.""" 

235 try: 

236 data = request.get_json() 

237 resource_uri = data.get("resource_uri") 

238 

239 if not resource_uri: 

240 return ( 

241 jsonify( 

242 {"status": "error", "message": gettext("No resource URI provided")} 

243 ), 

244 400, 

245 ) 

246 

247 success = g.resource_lock_manager.release_lock(resource_uri) 

248 

249 if success: 

250 return jsonify({"status": "success"}) 

251 

252 return ( 

253 jsonify({"status": "error", "message": gettext("Unable to release lock")}), 

254 400, 

255 ) 

256 

257 except Exception as e: 

258 current_app.logger.error(f"Error in release_lock: {str(e)}") 

259 return ( 

260 jsonify( 

261 {"status": "error", "message": gettext("An unexpected error occurred")} 

262 ), 

263 500, 

264 ) 

265 

266 

267@api_bp.route("/renew-lock", methods=["POST"]) 

268@login_required 

269def renew_lock(): 

270 """Renew an existing lock on a resource.""" 

271 try: 

272 data = request.get_json() 

273 resource_uri = data.get("resource_uri") 

274 

275 if not resource_uri: 

276 return ( 

277 jsonify( 

278 {"status": "error", "message": gettext("No resource URI provided")} 

279 ), 

280 400, 

281 ) 

282 

283 # When renewing a lock, we don't need to check for linked resources again 

284 # Just pass an empty list as we're only refreshing the existing lock 

285 success = g.resource_lock_manager.acquire_lock(resource_uri, []) 

286 

287 if success: 

288 return jsonify({"status": "success"}) 

289 

290 return ( 

291 jsonify({"status": "error", "message": gettext("Unable to renew lock")}), 

292 423, 

293 ) 

294 

295 except Exception as e: 

296 current_app.logger.error(f"Error in renew_lock: {str(e)}") 

297 return ( 

298 jsonify( 

299 {"status": "error", "message": gettext("An unexpected error occurred")} 

300 ), 

301 500, 

302 ) 

303 

304 

305@api_bp.route("/validate-literal", methods=["POST"]) 

306@login_required 

307def validate_literal(): 

308 """Validate a literal value and suggest appropriate datatypes.""" 

309 value = request.json.get("value") 

310 if not value: 

311 return jsonify({"error": gettext("Value is required.")}), 400 

312 

313 matching_datatypes = [] 

314 for datatype, validation_func, _ in DATATYPE_MAPPING: 

315 if validation_func(value): 

316 matching_datatypes.append(str(datatype)) 

317 

318 if not matching_datatypes: 

319 return jsonify({"error": gettext("No matching datatypes found.")}), 400 

320 

321 return jsonify({"valid_datatypes": matching_datatypes}), 200 

322 

323 

324@api_bp.route("/check_orphans", methods=["POST"]) 

325@login_required 

326def check_orphans(): 

327 """ 

328 Check for orphaned entities and intermediate relations (proxies) that would result from the requested changes. 

329 Applies separate handling strategies for orphans and proxies, but returns a unified report. 

330 """ 

331 try: 

332 # Get strategies from configuration 

333 orphan_strategy = current_app.config.get( 

334 "ORPHAN_HANDLING_STRATEGY", OrphanHandlingStrategy.KEEP 

335 ) 

336 proxy_strategy = current_app.config.get( 

337 "PROXY_HANDLING_STRATEGY", ProxyHandlingStrategy.KEEP 

338 ) 

339 

340 data = request.json 

341 # Validate required fields 

342 if not data or "changes" not in data or "entity_type" not in data: 

343 return ( 

344 jsonify( 

345 { 

346 "status": "error", 

347 "error_type": "validation", 

348 "message": gettext( 

349 "Invalid request: 'changes' and 'entity_type' are required fields" 

350 ), 

351 } 

352 ), 

353 400, 

354 ) 

355 

356 changes = data.get("changes", []) 

357 entity_type = data.get("entity_type") 

358 entity_shape = data.get("entity_shape") 

359 custom_filter = get_custom_filter() 

360 

361 orphans = [] 

362 intermediate_orphans = [] 

363 

364 # Check for orphans and proxies based on their respective strategies 

365 check_for_orphans = orphan_strategy in ( 

366 OrphanHandlingStrategy.DELETE, 

367 OrphanHandlingStrategy.ASK, 

368 ) 

369 check_for_proxies = proxy_strategy in ( 

370 ProxyHandlingStrategy.DELETE, 

371 ProxyHandlingStrategy.ASK, 

372 ) 

373 if check_for_orphans or check_for_proxies: 

374 for change in changes: 

375 if change["action"] == "delete": 

376 found_orphans, found_intermediates = find_orphaned_entities( 

377 change["subject"], 

378 entity_type, 

379 change.get("predicate"), 

380 change.get("object"), 

381 ) 

382 # Only collect orphans if we need to handle them 

383 if check_for_orphans: 

384 orphans.extend(found_orphans) 

385 

386 # Only collect proxies if we need to handle them 

387 if check_for_proxies: 

388 intermediate_orphans.extend(found_intermediates) 

389 

390 # If both strategies are KEEP or no entities found, return empty result 

391 if (orphan_strategy == OrphanHandlingStrategy.KEEP or not orphans) and ( 

392 proxy_strategy == ProxyHandlingStrategy.KEEP or not intermediate_orphans 

393 ): 

394 return jsonify({"status": "success", "affected_entities": []}) 

395 

396 # Format entities for display 

397 def format_entities(entities, is_intermediate=False): 

398 return [ 

399 { 

400 "uri": entity["uri"], 

401 "label": custom_filter.human_readable_entity( 

402 entity["uri"], (entity["type"], entity_shape) 

403 ), 

404 "type": custom_filter.human_readable_class( 

405 (entity["type"], entity_shape) 

406 ), 

407 "is_intermediate": is_intermediate, 

408 } 

409 for entity in entities 

410 ] 

411 

412 # Create a unified list of affected entities 

413 affected_entities = format_entities(orphans) + format_entities( 

414 intermediate_orphans, is_intermediate=True 

415 ) 

416 

417 # Determine if we should automatically delete entities 

418 should_delete_orphans = orphan_strategy == OrphanHandlingStrategy.DELETE 

419 should_delete_proxies = proxy_strategy == ProxyHandlingStrategy.DELETE 

420 

421 # If both strategies are DELETE, we can automatically delete everything 

422 if should_delete_orphans and should_delete_proxies: 

423 return jsonify( 

424 { 

425 "status": "success", 

426 "affected_entities": affected_entities, 

427 "should_delete": True, 

428 "orphan_strategy": orphan_strategy.value, 

429 "proxy_strategy": proxy_strategy.value, 

430 } 

431 ) 

432 

433 # If at least one strategy is ASK, we need to ask the user 

434 return jsonify( 

435 { 

436 "status": "success", 

437 "affected_entities": affected_entities, 

438 "should_delete": False, 

439 "orphan_strategy": orphan_strategy.value, 

440 "proxy_strategy": proxy_strategy.value, 

441 } 

442 ) 

443 except ValueError as e: 

444 # Handle validation errors specifically 

445 error_message = str(e) 

446 current_app.logger.warning( 

447 f"Validation error in check_orphans: {error_message}" 

448 ) 

449 return ( 

450 jsonify( 

451 { 

452 "status": "error", 

453 "error_type": "validation", 

454 "message": gettext( 

455 "An error occurred while checking for orphaned entities" 

456 ), 

457 } 

458 ), 

459 400, 

460 ) 

461 except Exception as e: 

462 # Handle other errors 

463 error_message = f"Error checking orphans: {str(e)}" 

464 current_app.logger.error(f"{error_message}\n{traceback.format_exc()}") 

465 return ( 

466 jsonify( 

467 { 

468 "status": "error", 

469 "error_type": "system", 

470 "message": gettext( 

471 "An error occurred while checking for orphaned entities" 

472 ), 

473 } 

474 ), 

475 500, 

476 ) 

477 

478 

479@api_bp.route("/apply_changes", methods=["POST"]) 

480@login_required 

481def apply_changes(): 

482 """Apply changes to entities. 

483 

484 Request body: 

485 { 

486 "subject": (str) Main entity URI being modified, 

487 "changes": (list) List of changes to apply, 

488 "primary_source": (str) Primary source to use for provenance, 

489 "save_default_source": (bool) Whether to save primary_source as default for current user, 

490 "affected_entities": (list) Entities potentially affected by delete operations, 

491 "delete_affected": (bool) Whether to delete affected entities 

492 } 

493 

494 Responses: 

495 200 OK: Changes applied successfully 

496 400 Bad Request: Invalid request or validation error 

497 500 Internal Server Error: Server error while applying changes 

498 """ 

499 try: 

500 changes = request.get_json() 

501 if not changes: 

502 return jsonify({"error": "No request data provided"}), 400 

503 

504 first_change = changes[0] if changes else {} 

505 subject = first_change.get("subject") 

506 affected_entities = first_change.get("affected_entities", []) 

507 delete_affected = first_change.get("delete_affected", False) 

508 primary_source = first_change.get("primary_source") 

509 save_default_source = first_change.get("save_default_source", False) 

510 

511 if primary_source and not validators.url(primary_source): 

512 return jsonify({"error": "Invalid primary source URL"}), 400 

513 

514 if save_default_source and primary_source and validators.url(primary_source): 

515 save_user_default_primary_source(current_user.orcid, primary_source) 

516 

517 changes = transform_changes_with_virtual_properties(changes) 

518 

519 deleted_entities = set() 

520 editor = Editor( 

521 get_dataset_endpoint(), 

522 get_provenance_endpoint(), 

523 current_app.config["COUNTER_HANDLER"], 

524 URIRef(get_responsible_agent_uri(current_user.orcid)), 

525 current_app.config["PRIMARY_SOURCE"], 

526 current_app.config["DATASET_GENERATION_TIME"], 

527 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

528 ) 

529 

530 if primary_source and validators.url(primary_source): 

531 editor.set_primary_source(primary_source) 

532 

533 has_entity_deletion = any( 

534 change["action"] == "delete" and not change.get("predicate") 

535 for change in changes 

536 ) 

537 

538 editor = import_entity_graph( 

539 editor, 

540 subject, 

541 include_referencing_entities=has_entity_deletion 

542 ) 

543 

544 for change in changes: 

545 if change["action"] == "create": 

546 data = change.get("data") 

547 if data: 

548 import_referenced_entities(editor, data) 

549 

550 editor.preexisting_finished() 

551 

552 graph_uri = None 

553 if editor.dataset_is_quadstore: 

554 for quad in editor.g_set.quads((URIRef(subject), None, None, None)): 

555 graph_context = quad[3] 

556 graph_uri = get_graph_uri_from_context(graph_context) 

557 break 

558 

559 temp_id_to_uri = {} 

560 for change in changes: 

561 if change["action"] == "create": 

562 data = change.get("data") 

563 if data: 

564 change_subject = change.get("subject") 

565 created_subject = create_logic( 

566 editor, 

567 data, 

568 change_subject, 

569 graph_uri, 

570 temp_id_to_uri=temp_id_to_uri, 

571 parent_entity_type=None, 

572 ) 

573 # Only update the main subject if this is the main entity 

574 if change_subject is not None: 

575 subject = created_subject 

576 

577 orphan_strategy = current_app.config.get( 

578 "ORPHAN_HANDLING_STRATEGY", OrphanHandlingStrategy.KEEP 

579 ) 

580 proxy_strategy = current_app.config.get( 

581 "PROXY_HANDLING_STRATEGY", ProxyHandlingStrategy.KEEP 

582 ) 

583 # Separiamo le operazioni di delete in due fasi: 

584 # 1. Prima eliminiamo tutte le entità orfane/intermedie 

585 # 2. Poi eliminiamo le triple specifiche 

586 

587 # Fase 1: Elimina le entità orfane/intermedie 

588 if affected_entities and delete_affected: 

589 # Separa gli orfani dalle entità proxy 

590 orphans = [entity for entity in affected_entities if not entity.get("is_intermediate")] 

591 proxies = [entity for entity in affected_entities if entity.get("is_intermediate")] 

592 

593 # Gestione degli orfani secondo la strategia per gli orfani 

594 should_delete_orphans = ( 

595 orphan_strategy == OrphanHandlingStrategy.DELETE 

596 or (orphan_strategy == OrphanHandlingStrategy.ASK and delete_affected) 

597 ) 

598 

599 if should_delete_orphans and orphans: 

600 for orphan in orphans: 

601 orphan_uri = orphan["uri"] 

602 if orphan_uri in deleted_entities: 

603 continue 

604 

605 delete_logic(editor, orphan_uri, graph_uri=graph_uri) 

606 deleted_entities.add(orphan_uri) 

607 

608 # Gestione delle entità proxy secondo la strategia per i proxy 

609 should_delete_proxies = ( 

610 proxy_strategy == ProxyHandlingStrategy.DELETE 

611 or (proxy_strategy == ProxyHandlingStrategy.ASK and delete_affected) 

612 ) 

613 

614 if should_delete_proxies and proxies: 

615 for proxy in proxies: 

616 proxy_uri = proxy["uri"] 

617 if proxy_uri in deleted_entities: 

618 continue 

619 

620 delete_logic(editor, proxy_uri, graph_uri=graph_uri) 

621 deleted_entities.add(proxy_uri) 

622 

623 # Fase 2: Processa tutte le altre modifiche 

624 for change in changes: 

625 if change["action"] == "delete": 

626 subject_uri = change["subject"] 

627 predicate = change.get("predicate") 

628 object_value = change.get("object") 

629 

630 # Se stiamo eliminando un'intera entità 

631 if not predicate: 

632 if subject_uri in deleted_entities: 

633 continue 

634 

635 delete_logic(editor, subject_uri, graph_uri=graph_uri, entity_type=change.get("entity_type"), entity_shape=change.get("entity_shape")) 

636 deleted_entities.add(subject_uri) 

637 # Se stiamo eliminando una tripla specifica 

638 elif object_value: 

639 # Controlla se l'oggetto è un'entità che è già stata eliminata 

640 if object_value in deleted_entities: 

641 continue 

642 

643 delete_logic(editor, subject_uri, predicate, object_value, graph_uri, change.get("entity_type"), change.get("entity_shape")) 

644 

645 # La gestione degli orfani e dei proxy è stata spostata all'inizio del ciclo 

646 

647 elif change["action"] == "update": 

648 update_logic( 

649 editor, 

650 change["subject"], 

651 change["predicate"], 

652 change["object"], 

653 change["newObject"], 

654 graph_uri, 

655 change.get("entity_type"), 

656 change.get("entity_shape"), 

657 ) 

658 elif change["action"] == "order": 

659 order_logic( 

660 editor, 

661 change["subject"], 

662 change["predicate"], 

663 change["object"], 

664 change["newObject"], 

665 graph_uri, 

666 temp_id_to_uri, 

667 ) 

668 

669 try: 

670 editor.save() 

671 except ValueError as ve: 

672 # Re-raise ValueError so it can be caught by the outer try-except block 

673 current_app.logger.error(f"Error during save operation: {str(ve)}") 

674 raise 

675 except Exception as save_error: 

676 current_app.logger.error(f"Error during save operation: {str(save_error)}") 

677 return jsonify( 

678 { 

679 "status": "error", 

680 "error_type": "database", 

681 "message": gettext("Failed to save changes to the database: {}").format(str(save_error)), 

682 } 

683 ), 500 

684 

685 return ( 

686 jsonify( 

687 { 

688 "status": "success", 

689 "message": gettext("Changes applied successfully"), 

690 } 

691 ), 

692 200, 

693 ) 

694 

695 except ValueError as e: 

696 # Handle validation errors specifically 

697 error_message = str(e) 

698 current_app.logger.warning(f"Validation error: {error_message}") 

699 return ( 

700 jsonify( 

701 { 

702 "status": "error", 

703 "error_type": "validation", 

704 "message": error_message, 

705 } 

706 ), 

707 400, 

708 ) 

709 except Exception as e: 

710 # Handle other errors 

711 error_message = ( 

712 f"Error while applying changes: {str(e)}\n{traceback.format_exc()}" 

713 ) 

714 current_app.logger.error(error_message) 

715 return ( 

716 jsonify( 

717 { 

718 "status": "error", 

719 "error_type": "system", 

720 "message": gettext("An error occurred while applying changes"), 

721 } 

722 ), 

723 500, 

724 ) 

725 

726 

727def get_graph_uri_from_context(graph_context): 

728 """Extract the graph URI from a graph context. 

729  

730 Args: 

731 graph_context: Either a Graph object or a direct URI reference 

732  

733 Returns: 

734 The graph URI 

735 """ 

736 if isinstance(graph_context, Graph): 

737 return graph_context.identifier 

738 else: 

739 return graph_context 

740 

741 

742def determine_datatype(value, datatype_uris): 

743 for datatype_uri in datatype_uris: 

744 validation_func = next( 

745 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None 

746 ) 

747 if validation_func and validation_func(value): 

748 return URIRef(datatype_uri) 

749 # If none match, default to XSD.string 

750 return XSD.string 

751 

752 

753def create_logic( 

754 editor: Editor, 

755 data: Dict[str, dict], 

756 subject=None, 

757 graph_uri=None, 

758 parent_subject=None, 

759 parent_predicate=None, 

760 temp_id_to_uri=None, 

761 parent_entity_type=None, 

762): 

763 """ 

764 Recursively creates an entity and its properties based on a dictionary. 

765 

766 This function handles the creation of a main entity and any nested entities 

767 defined within its properties. It validates each triple before creation and 

768 can link the new entity to a parent entity. 

769 

770 Args: 

771 editor (Editor): The editor instance for graph operations. 

772 data (Dict[str, dict]): A dictionary describing the entity to create. 

773 subject (URIRef, optional): The subject URI of the entity. If None, a new URI is generated. 

774 graph_uri (str, optional): The named graph URI for the operations. 

775 parent_subject (URIRef, optional): The subject URI of the parent entity. 

776 parent_predicate (URIRef, optional): The predicate URI linking the parent to this entity. 

777 temp_id_to_uri (Dict, optional): A dictionary mapping temporary frontend IDs to backend URIs. 

778 parent_entity_type (str, optional): The RDF type of the parent entity, for validation. 

779 

780 Example of `data` structure: 

781 { 

782 "entity_type": "http://purl.org/spar/fabio/JournalArticle", 

783 "properties": { 

784 "http://purl.org/spar/pro/isDocumentContextFor": [ 

785 { 

786 "entity_type": "http://purl.org/spar/pro/RoleInTime", 

787 "properties": { 

788 "http://purl.org/spar/pro/isHeldBy": [ 

789 "https://w3id.org/oc/meta/ra/09110374" 

790 ], 

791 "http://purl.org/spar/pro/withRole": "http://purl.org/spar/pro/author" 

792 }, 

793 "tempId": "temp-1" 

794 } 

795 ] 

796 } 

797 } 

798 """ 

799 entity_type = data.get("entity_type") 

800 properties = data.get("properties", {}) 

801 temp_id = data.get("tempId") 

802 

803 if subject is None: 

804 subject = generate_unique_uri(entity_type, data) 

805 

806 if temp_id and temp_id_to_uri is not None: 

807 temp_id_to_uri[temp_id] = str(subject) 

808 

809 # Create the entity type using validate_new_triple 

810 if parent_subject is not None: 

811 type_value, _, error_message = validate_new_triple( 

812 subject, RDF.type, entity_type, "create", entity_types=entity_type 

813 ) 

814 if error_message: 

815 raise ValueError(error_message) 

816 

817 if type_value is not None: 

818 editor.create(URIRef(subject), RDF.type, type_value, graph_uri) 

819 

820 # Create the relationship to the parent using validate_new_triple 

821 if parent_subject and parent_predicate: 

822 # When creating a relationship, we need to validate that the parent can have this relationship 

823 # with an entity of our type. Pass our entity_type as the object_entity_type for validation 

824 parent_value, _, error_message = validate_new_triple( 

825 parent_subject, 

826 parent_predicate, 

827 subject, 

828 "create", 

829 entity_types=parent_entity_type, 

830 ) 

831 if error_message: 

832 raise ValueError(error_message) 

833 

834 if parent_value is not None: 

835 editor.create( 

836 URIRef(parent_subject), 

837 URIRef(parent_predicate), 

838 parent_value, 

839 graph_uri, 

840 ) 

841 

842 for predicate, values in properties.items(): 

843 if not isinstance(values, list): 

844 values = [values] 

845 for value in values: 

846 # CASE 1: Nested Entity. 

847 # If the value is a dictionary containing 'entity_type', it's a nested entity 

848 # that needs to be created recursively. 

849 if isinstance(value, dict) and "entity_type" in value: 

850 # A new URI is generated for the nested entity. 

851 nested_subject = generate_unique_uri(value["entity_type"]) 

852 # The function calls itself to create the nested entity. The current entity 

853 # becomes the parent for the nested one. 

854 create_logic( 

855 editor, 

856 value, 

857 nested_subject, 

858 graph_uri, 

859 subject, # Current entity is the parent subject 

860 predicate, # The predicate linking parent to child 

861 temp_id_to_uri, 

862 parent_entity_type=entity_type, 

863 ) 

864 # CASE 2: Existing Entity Reference. 

865 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

866 entity_uri = value.get("entity_uri") 

867 if entity_uri: 

868 object_value = URIRef(entity_uri) 

869 editor.create( 

870 URIRef(subject), URIRef(predicate), object_value, graph_uri 

871 ) 

872 else: 

873 raise ValueError("Missing entity_uri in existing entity reference") 

874 # CASE 3: Custom Property. 

875 # If the value is a dictionary marked as 'is_custom_property', it's a property 

876 # that is not defined in the SHACL shape and should be added without validation. 

877 elif isinstance(value, dict) and value.get("is_custom_property", False): 

878 # The property is created directly based on its type ('uri' or 'literal'). 

879 if value["type"] == "uri": 

880 object_value = URIRef(value["value"]) 

881 elif value["type"] == "literal": 

882 datatype = ( 

883 URIRef(value["datatype"]) 

884 if "datatype" in value 

885 else XSD.string 

886 ) 

887 object_value = Literal(value["value"], datatype=datatype) 

888 else: 

889 raise ValueError(f"Unknown custom property type: {value['type']}") 

890 

891 editor.create( 

892 URIRef(subject), URIRef(predicate), object_value, graph_uri 

893 ) 

894 # CASE 4: Standard Property. 

895 # This is the default case for all other properties. The value can be a 

896 # simple literal (e.g., a string, number) or a URI string. 

897 else: 

898 # The value is validated against the SHACL shape for the current entity type. 

899 # `validate_new_triple` checks if the triple is valid and returns the 

900 # correctly typed RDF object (e.g., Literal, URIRef). 

901 object_value, _, error_message = validate_new_triple( 

902 subject, predicate, value, "create", entity_types=entity_type 

903 ) 

904 if error_message: 

905 raise ValueError(error_message) 

906 

907 if object_value is not None: 

908 editor.create( 

909 URIRef(subject), URIRef(predicate), object_value, graph_uri 

910 ) 

911 

912 return subject 

913 

914 

915def update_logic( 

916 editor: Editor, 

917 subject, 

918 predicate, 

919 old_value, 

920 new_value, 

921 graph_uri=None, 

922 entity_type=None, 

923 entity_shape=None, 

924): 

925 new_value, old_value, error_message = validate_new_triple( 

926 subject, predicate, new_value, "update", old_value, entity_types=entity_type, entity_shape=entity_shape 

927 ) 

928 if error_message: 

929 raise ValueError(error_message) 

930 

931 editor.update(URIRef(subject), URIRef(predicate), old_value, new_value, graph_uri) 

932 

933 

934def rebuild_entity_order( 

935 editor: Editor, 

936 ordered_by_uri: URIRef, 

937 entities: list, 

938 graph_uri=None 

939): 

940 """ 

941 Rebuild the ordering chain for a list of entities. 

942  

943 Args: 

944 editor: The editor instance 

945 ordered_by_uri: The property used for ordering 

946 entities: List of entities to be ordered 

947 graph_uri: Optional graph URI 

948 """ 

949 # First, remove all existing ordering relationships 

950 for entity in entities: 

951 for s, p, o in list(get_triples_from_graph(editor.g_set, (entity, ordered_by_uri, None))): 

952 editor.delete(entity, ordered_by_uri, o, graph_uri) 

953 

954 # Then rebuild the chain with the entities 

955 for i in range(len(entities) - 1): 

956 current_entity = entities[i] 

957 next_entity = entities[i + 1] 

958 editor.create(current_entity, ordered_by_uri, next_entity, graph_uri) 

959 

960 return editor 

961 

962 

963def delete_logic( 

964 editor: Editor, 

965 subject, 

966 predicate=None, 

967 object_value=None, 

968 graph_uri=None, 

969 entity_type=None, 

970 entity_shape=None, 

971): 

972 # Ensure we have the correct data types for all values 

973 subject_uri = URIRef(subject) 

974 predicate_uri = URIRef(predicate) if predicate else None 

975 

976 # Validate and get correctly typed object value if we have a predicate 

977 if predicate and object_value: 

978 # Use validate_new_triple to validate the deletion and get the correctly typed object 

979 _, object_value, error_message = validate_new_triple( 

980 subject, predicate, None, "delete", object_value, entity_types=entity_type, entity_shape=entity_shape 

981 ) 

982 if error_message: 

983 raise ValueError(error_message) 

984 

985 editor.delete(subject_uri, predicate_uri, object_value, graph_uri) 

986 

987 

988def order_logic( 

989 editor: Editor, 

990 subject, 

991 predicate, 

992 new_order, 

993 ordered_by, 

994 graph_uri=None, 

995 temp_id_to_uri: Optional[Dict] = None, 

996): 

997 subject_uri = URIRef(subject) 

998 predicate_uri = URIRef(predicate) 

999 ordered_by_uri = URIRef(ordered_by) 

1000 # Ottieni tutte le entità ordinate attuali direttamente dall'editor 

1001 current_entities = [ 

1002 o for _, _, o in get_triples_from_graph(editor.g_set, (subject_uri, predicate_uri, None)) 

1003 ] 

1004 

1005 # Dizionario per mappare le vecchie entità alle nuove 

1006 old_to_new_mapping = {} 

1007 

1008 # Per ogni entità attuale 

1009 for old_entity in current_entities: 

1010 if str(old_entity) in new_order: # Processa solo le entità preesistenti 

1011 # Memorizza tutte le proprietà dell'entità attuale 

1012 entity_properties = list(get_triples_from_graph(editor.g_set, (old_entity, None, None))) 

1013 

1014 entity_type = next( 

1015 (o for _, p, o in entity_properties if p == RDF.type), None 

1016 ) 

1017 

1018 if entity_type is None: 

1019 raise ValueError( 

1020 f"Impossibile determinare il tipo dell'entità per {old_entity}" 

1021 ) 

1022 

1023 # Crea una nuova entità 

1024 new_entity_uri = generate_unique_uri(entity_type) 

1025 old_to_new_mapping[old_entity] = new_entity_uri 

1026 

1027 # Cancella la vecchia entità 

1028 editor.delete(subject_uri, predicate_uri, old_entity, graph_uri) 

1029 editor.delete(old_entity, graph=graph_uri) 

1030 

1031 # Ricrea il collegamento tra il soggetto principale e la nuova entità 

1032 editor.create(subject_uri, predicate_uri, new_entity_uri, graph_uri) 

1033 

1034 # Ripristina tutte le altre proprietà per la nuova entità 

1035 for _, p, o in entity_properties: 

1036 if p != predicate_uri and p != ordered_by_uri: 

1037 editor.create(new_entity_uri, p, o, graph_uri) 

1038 

1039 # Prepara la lista delle entità nel nuovo ordine 

1040 ordered_entities = [] 

1041 for entity in new_order: 

1042 new_entity_uri = old_to_new_mapping.get(URIRef(entity)) 

1043 if not new_entity_uri: 

1044 new_entity_uri = URIRef(temp_id_to_uri.get(entity, entity)) 

1045 ordered_entities.append(new_entity_uri) 

1046 

1047 # Ricostruisci l'ordine 

1048 if ordered_entities: 

1049 rebuild_entity_order(editor, ordered_by_uri, ordered_entities, graph_uri) 

1050 

1051 return editor 

1052 

1053 

1054@api_bp.route("/human-readable-entity", methods=["POST"]) 

1055@login_required 

1056def get_human_readable_entity(): 

1057 custom_filter = get_custom_filter() 

1058 

1059 # Check if required parameters are present 

1060 if "uri" not in request.form or "entity_class" not in request.form: 

1061 return jsonify({"status": "error", "message": "Missing required parameters"}), 400 

1062 

1063 uri = request.form["uri"] 

1064 entity_class = request.form["entity_class"] 

1065 shape = determine_shape_for_classes([entity_class]) 

1066 filter_instance = custom_filter 

1067 readable = filter_instance.human_readable_entity(uri, (entity_class, shape)) 

1068 return readable 

1069 

1070 

1071@api_bp.route('/format-source', methods=['POST']) 

1072@login_required 

1073def format_source_api(): 

1074 """ 

1075 API endpoint to format a source URL using the application's filters. 

1076 Accepts POST request with JSON body: {"url": "source_url"} 

1077 Returns JSON: {"formatted_html": "html_string"} 

1078 """ 

1079 data = request.get_json() 

1080 source_url = data.get('url') 

1081 

1082 if not source_url or not validators.url(source_url): 

1083 return jsonify({"error": gettext("Invalid or missing URL")}), 400 

1084 

1085 try: 

1086 custom_filter = get_custom_filter() 

1087 formatted_html = custom_filter.format_source_reference(source_url) 

1088 return jsonify({"formatted_html": formatted_html}) 

1089 except Exception as e: 

1090 current_app.logger.error(f"Error formatting source URL '{source_url}': {e}") 

1091 fallback_html = f'<a href="{source_url}" target="_blank">{source_url}</a>' 

1092 return jsonify({"formatted_html": fallback_html}) 

1093 

1094 

1095@api_bp.route("/form-fields", methods=["GET"]) 

1096@login_required 

1097def get_form_fields_for_entity(): 

1098 """ 

1099 Get form_fields for a specific entity class and shape combination. 

1100 Returns only the requested entity + immediate sub-entities (depth=2) to improve performance. 

1101 

1102 Query parameters: 

1103 entity_class: URI of the entity class 

1104 entity_shape: URI of the entity shape 

1105 

1106 Returns: 

1107 JSON response with form_fields for the specified entity 

1108 """ 

1109 

1110 try: 

1111 entity_class_decoded = request.args.get('entity_class') 

1112 entity_shape_decoded = request.args.get('entity_shape') 

1113 

1114 if not entity_class_decoded or not entity_shape_decoded: 

1115 return jsonify({ 

1116 'status': 'error', 

1117 'message': 'Missing required parameters: entity_class and entity_shape' 

1118 }), 400 

1119 

1120 all_form_fields = get_form_fields() 

1121 

1122 if not all_form_fields: 

1123 return jsonify({ 

1124 'status': 'error', 

1125 'message': 'Form fields not initialized' 

1126 }), 500 

1127 

1128 entity_key = (entity_class_decoded, entity_shape_decoded) 

1129 

1130 if entity_key not in all_form_fields: 

1131 return jsonify({ 

1132 'status': 'error', 

1133 'message': f'No form fields found for entity class {entity_class_decoded} with shape {entity_shape_decoded}' 

1134 }), 404 

1135 

1136 entity_form_fields = all_form_fields[entity_key] 

1137 

1138 # Convert OrderedDict to list of [property, details] pairs to preserve order 

1139 ordered_properties = [] 

1140 for prop, details_list in entity_form_fields.items(): 

1141 ordered_properties.append([prop, details_list]) 

1142 

1143 return jsonify({ 

1144 'status': 'success', 

1145 'form_fields': ordered_properties, 

1146 'entity_key': [entity_class_decoded, entity_shape_decoded] 

1147 }) 

1148 

1149 except Exception as e: 

1150 current_app.logger.error(f"Error loading form fields for {entity_class_decoded}/{entity_shape_decoded}: {e}") 

1151 current_app.logger.error(f"Full traceback: {traceback.format_exc()}") 

1152 

1153 return jsonify({ 

1154 'status': 'error', 

1155 'message': f'Failed to load form fields: {str(e)}' 

1156 }), 500 

1157 

1158 

1159@api_bp.route("/render-form-fields", methods=["POST"]) 

1160@login_required 

1161def render_form_fields_html(): 

1162 """ 

1163 Render form fields as HTML for dynamic loading. 

1164 

1165 Expects JSON payload with: 

1166 - entity_key: [entity_class, entity_shape] array 

1167 

1168 Returns: 

1169 HTML string of the rendered form fields 

1170 """ 

1171 try: 

1172 data = request.get_json() 

1173 

1174 if not data or 'entity_key' not in data: 

1175 return jsonify({ 

1176 'status': 'error', 

1177 'message': 'Missing required field: entity_key' 

1178 }), 400 

1179 

1180 entity_key = data['entity_key'] # This is [entity_class, entity_shape] array 

1181 entity_class, entity_shape = entity_key 

1182 

1183 all_form_fields = get_form_fields() 

1184 

1185 if not all_form_fields: 

1186 return jsonify({ 

1187 'status': 'error', 

1188 'message': 'Form fields not initialized' 

1189 }), 500 

1190 

1191 tuple_key = (entity_class, entity_shape) 

1192 if tuple_key not in all_form_fields: 

1193 return jsonify({ 

1194 'status': 'error', 

1195 'message': f'No form fields found for entity {entity_class} with shape {entity_shape}' 

1196 }), 404 

1197 

1198 entity_form_fields = all_form_fields[tuple_key] 

1199 

1200 form_fields_array = [[prop, details_list] for prop, details_list in entity_form_fields.items()] 

1201 

1202 template_string = ''' 

1203 {% from 'macros.jinja' import render_form_field with context %} 

1204 

1205 {% set entity_type = entity_class %} 

1206 {% set entity_shape = entity_shape %} 

1207 {% set group_id = ((entity_type, entity_shape) | human_readable_class + "_group") | replace(" ", "_") %} 

1208 <div class="property-group mb-3" id="{{ group_id }}" data-uri="{{ entity_type }}" data-shape="{{ entity_shape }}"> 

1209 {% for prop_data in ordered_form_fields %} 

1210 {% set prop = prop_data[0] %} 

1211 {% set details_list = prop_data[1] %} 

1212 {% for details in details_list %} 

1213 {{ render_form_field(entity_type, prop, details, all_form_fields) }} 

1214 {% endfor %} 

1215 {% endfor %} 

1216 </div> 

1217 ''' 

1218 

1219 html = render_template_string( 

1220 template_string, 

1221 entity_class=entity_class, 

1222 entity_shape=entity_shape, 

1223 ordered_form_fields=form_fields_array, 

1224 all_form_fields=all_form_fields 

1225 ) 

1226 

1227 return html 

1228 

1229 except Exception as e: 

1230 current_app.logger.error(f"Error rendering form fields HTML: {e}") 

1231 current_app.logger.error(f"Full traceback: {traceback.format_exc()}") 

1232 

1233 return jsonify({ 

1234 'status': 'error', 

1235 'message': f'Failed to render form fields: {str(e)}' 

1236 }), 500 

1237 

1238 

1239@api_bp.route("/render-nested-form", methods=["POST"]) 

1240@login_required 

1241def render_nested_form_html(): 

1242 """ 

1243 Render a single nested form for lazy loading in sh:or contexts. 

1244 

1245 Expects JSON payload with: 

1246 - parent_entity_class: The parent entity class URI 

1247 - parent_entity_shape: The parent entity shape URI 

1248 - entity_class: The sub-entity class URI to render 

1249 - entity_shape: The sub-entity shape URI to render 

1250 - predicate_uri: The predicate URI linking parent to sub-entity 

1251 - depth: The nesting depth 

1252 - is_template: Whether this is a template item 

1253 

1254 Returns: 

1255 HTML string of the rendered nested form 

1256 """ 

1257 try: 

1258 data = request.get_json() 

1259 

1260 required_fields = ['parent_entity_class', 'parent_entity_shape', 'entity_class', 'entity_shape', 'predicate_uri', 'depth'] 

1261 

1262 if not data: 

1263 return jsonify({ 

1264 'status': 'error', 

1265 'message': 'No JSON data provided' 

1266 }), 400 

1267 

1268 missing_fields = [field for field in required_fields if field not in data] 

1269 if missing_fields: 

1270 return jsonify({ 

1271 'status': 'error', 

1272 'message': f'Missing required fields: {", ".join(required_fields)}' 

1273 }), 400 

1274 

1275 parent_entity_class = data['parent_entity_class'] 

1276 parent_entity_shape = data['parent_entity_shape'] 

1277 entity_class = data['entity_class'] 

1278 entity_shape = data['entity_shape'] 

1279 predicate_uri = data['predicate_uri'] 

1280 depth = int(data['depth']) 

1281 is_template = data.get('is_template', False) 

1282 

1283 all_form_fields = get_form_fields() 

1284 

1285 if not all_form_fields: 

1286 return jsonify({ 

1287 'status': 'error', 

1288 'message': 'Form fields not initialized' 

1289 }), 500 

1290 

1291 parent_entity_key = (parent_entity_class, parent_entity_shape) 

1292 if parent_entity_key not in all_form_fields: 

1293 return jsonify({ 

1294 'status': 'error', 

1295 'message': f'No form fields found for parent entity {parent_entity_class} with shape {parent_entity_shape}' 

1296 }), 404 

1297 

1298 parent_fields = all_form_fields[parent_entity_key] 

1299 if predicate_uri not in parent_fields: 

1300 return jsonify({ 

1301 'status': 'error', 

1302 'message': f'No field definition found for predicate {predicate_uri} in parent entity' 

1303 }), 404 

1304 

1305 field_details_list = parent_fields[predicate_uri] 

1306 

1307 target_details = None 

1308 for details in field_details_list: 

1309 if details.get('or'): 

1310 for shape_info in details['or']: 

1311 if (shape_info.get('entityType') == entity_class and 

1312 shape_info.get('nodeShape') == entity_shape): 

1313 target_details = shape_info 

1314 break 

1315 if target_details: 

1316 break 

1317 

1318 if not target_details: 

1319 return jsonify({ 

1320 'status': 'error', 

1321 'message': f'No matching shape info found for {entity_class}/{entity_shape} in parent predicate {predicate_uri}' 

1322 }), 404 

1323 

1324 template_string = ''' 

1325 {% from 'macros.jinja' import render_form_field with context %} 

1326 {{ render_form_field(parent_entity_class, predicate_uri, shape_info, all_form_fields, depth, is_template=is_template) }} 

1327 ''' 

1328 

1329 html = render_template_string( 

1330 template_string, 

1331 parent_entity_class=parent_entity_class, 

1332 predicate_uri=predicate_uri, 

1333 shape_info=target_details, 

1334 all_form_fields=all_form_fields, 

1335 depth=depth, 

1336 is_template=is_template 

1337 ) 

1338 

1339 return html 

1340 

1341 except Exception as e: 

1342 current_app.logger.error(f"Error rendering nested form HTML: {e}") 

1343 current_app.logger.error(f"Full traceback: {traceback.format_exc()}") 

1344 

1345 return jsonify({ 

1346 'status': 'error', 

1347 'message': f'Failed to render nested form: {str(e)}' 

1348 }), 500