Coverage for heritrace/routes/api.py: 100%

385 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-06-24 11:39 +0000

1# heritrace/routes/api.py 

2 

3import traceback 

4from typing import Dict, Optional 

5 

6import validators 

7from flask import Blueprint, current_app, g, jsonify, request 

8from flask_babel import gettext 

9from flask_login import current_user, login_required 

10from heritrace.editor import Editor 

11from heritrace.extensions import (get_custom_filter, get_dataset_endpoint, 

12 get_provenance_endpoint) 

13from heritrace.services.resource_lock_manager import LockStatus 

14from heritrace.utils.shacl_utils import determine_shape_for_classes 

15from heritrace.utils.primary_source_utils import \ 

16 save_user_default_primary_source 

17from heritrace.utils.shacl_validation import validate_new_triple 

18from heritrace.utils.sparql_utils import (find_orphaned_entities, 

19 get_available_classes, 

20 get_catalog_data, 

21 get_deleted_entities_with_filtering, 

22 import_entity_graph) 

23from heritrace.utils.strategies import (OrphanHandlingStrategy, 

24 ProxyHandlingStrategy) 

25from heritrace.utils.uri_utils import generate_unique_uri 

26from rdflib import RDF, XSD, Graph, URIRef 

27from resources.datatypes import DATATYPE_MAPPING 

28 

29api_bp = Blueprint("api", __name__) 

30 

31 

32@api_bp.route("/catalogue") 

33@login_required 

34def catalogue_api(): 

35 selected_class = request.args.get("class") 

36 selected_shape = request.args.get("shape") 

37 page = int(request.args.get("page", 1)) 

38 per_page = int(request.args.get("per_page", 50)) 

39 sort_property = request.args.get("sort_property") 

40 sort_direction = request.args.get("sort_direction", "ASC") 

41 

42 allowed_per_page = [50, 100, 200, 500] 

43 if per_page not in allowed_per_page: 

44 per_page = 50 

45 

46 if not sort_property or sort_property.lower() == "null": 

47 sort_property = None 

48 

49 available_classes = get_available_classes() 

50 

51 catalog_data = get_catalog_data( 

52 selected_class=selected_class, 

53 page=page, 

54 per_page=per_page, 

55 sort_property=sort_property, 

56 sort_direction=sort_direction, 

57 selected_shape=selected_shape 

58 ) 

59 

60 catalog_data["available_classes"] = available_classes 

61 return jsonify(catalog_data) 

62 

63 

64@api_bp.route("/time-vault") 

65@login_required 

66def get_deleted_entities_api(): 

67 """ 

68 API endpoint to retrieve deleted entities with pagination and sorting. 

69 Only processes and returns entities whose classes are marked as visible. 

70 """ 

71 selected_class = request.args.get("class") 

72 selected_shape = request.args.get("shape") 

73 page = int(request.args.get("page", 1)) 

74 per_page = int(request.args.get("per_page", 50)) 

75 sort_property = request.args.get("sort_property", "deletionTime") 

76 sort_direction = request.args.get("sort_direction", "DESC") 

77 

78 allowed_per_page = [50, 100, 200, 500] 

79 if per_page not in allowed_per_page: 

80 per_page = 50 

81 

82 deleted_entities, available_classes, selected_class, selected_shape, sortable_properties, total_count = ( 

83 get_deleted_entities_with_filtering( 

84 page, per_page, sort_property, sort_direction, selected_class, selected_shape 

85 ) 

86 ) 

87 

88 return jsonify( 

89 { 

90 "entities": deleted_entities, 

91 "total_pages": (total_count + per_page - 1) // per_page if total_count > 0 else 0, 

92 "current_page": page, 

93 "per_page": per_page, 

94 "total_count": total_count, 

95 "sort_property": sort_property, 

96 "sort_direction": sort_direction, 

97 "selected_class": selected_class, 

98 "selected_shape": selected_shape, 

99 "available_classes": available_classes, 

100 "sortable_properties": sortable_properties, 

101 } 

102 ) 

103 

104 

105@api_bp.route("/check-lock", methods=["POST"]) 

106@login_required 

107def check_lock(): 

108 """Check if a resource is locked.""" 

109 try: 

110 data = request.get_json() 

111 resource_uri = data.get("resource_uri") 

112 

113 if not resource_uri: 

114 return ( 

115 jsonify( 

116 {"status": "error", "message": gettext("No resource URI provided")} 

117 ), 

118 400, 

119 ) 

120 

121 status, lock_info = g.resource_lock_manager.check_lock_status(resource_uri) 

122 

123 if status == LockStatus.LOCKED: 

124 return jsonify( 

125 { 

126 "status": "locked", 

127 "title": gettext("Resource Locked"), 

128 "message": gettext( 

129 "This resource is currently being edited by %(user)s [%(orcid)s]", 

130 user=lock_info.user_name, 

131 orcid=lock_info.user_id, 

132 ), 

133 } 

134 ) 

135 elif status == LockStatus.ERROR: 

136 return ( 

137 jsonify( 

138 { 

139 "status": "error", 

140 "title": gettext("Error"), 

141 "message": gettext("An error occurred while checking the lock"), 

142 } 

143 ), 

144 500, 

145 ) 

146 else: 

147 return jsonify({"status": "available"}) 

148 

149 except Exception as e: 

150 current_app.logger.error(f"Error in check_lock: {str(e)}") 

151 return ( 

152 jsonify( 

153 { 

154 "status": "error", 

155 "title": gettext("Error"), 

156 "message": gettext("An unexpected error occurred"), 

157 } 

158 ), 

159 500, 

160 ) 

161 

162 

163@api_bp.route("/acquire-lock", methods=["POST"]) 

164@login_required 

165def acquire_lock(): 

166 """Try to acquire a lock on a resource.""" 

167 try: 

168 data = request.get_json() 

169 resource_uri = data.get("resource_uri") 

170 linked_resources = data.get("linked_resources", []) 

171 

172 if not resource_uri: 

173 return ( 

174 jsonify( 

175 {"status": "error", "message": gettext("No resource URI provided")} 

176 ), 

177 400, 

178 ) 

179 

180 # First check if the resource or any related resource is locked by another user 

181 status, lock_info = g.resource_lock_manager.check_lock_status(resource_uri) 

182 if status == LockStatus.LOCKED: 

183 return ( 

184 jsonify( 

185 { 

186 "status": "locked", 

187 "title": gettext("Resource Locked"), 

188 "message": gettext( 

189 "This resource is currently being edited by %(user)s [%(orcid)s]", 

190 user=lock_info.user_name, 

191 orcid=lock_info.user_id, 

192 ), 

193 } 

194 ), 

195 200, 

196 ) 

197 

198 # Use the provided linked_resources 

199 success = g.resource_lock_manager.acquire_lock(resource_uri, linked_resources) 

200 

201 if success: 

202 return jsonify({"status": "success"}) 

203 

204 return ( 

205 jsonify( 

206 { 

207 "status": "error", 

208 "message": gettext("Resource is locked by another user"), 

209 } 

210 ), 

211 423, 

212 ) 

213 

214 except Exception as e: 

215 current_app.logger.error(f"Error in acquire_lock: {str(e)}") 

216 return ( 

217 jsonify( 

218 {"status": "error", "message": gettext("An unexpected error occurred")} 

219 ), 

220 500, 

221 ) 

222 

223 

224@api_bp.route("/release-lock", methods=["POST"]) 

225@login_required 

226def release_lock(): 

227 """Release a lock on a resource.""" 

228 try: 

229 data = request.get_json() 

230 resource_uri = data.get("resource_uri") 

231 

232 if not resource_uri: 

233 return ( 

234 jsonify( 

235 {"status": "error", "message": gettext("No resource URI provided")} 

236 ), 

237 400, 

238 ) 

239 

240 success = g.resource_lock_manager.release_lock(resource_uri) 

241 

242 if success: 

243 return jsonify({"status": "success"}) 

244 

245 return ( 

246 jsonify({"status": "error", "message": gettext("Unable to release lock")}), 

247 400, 

248 ) 

249 

250 except Exception as e: 

251 current_app.logger.error(f"Error in release_lock: {str(e)}") 

252 return ( 

253 jsonify( 

254 {"status": "error", "message": gettext("An unexpected error occurred")} 

255 ), 

256 500, 

257 ) 

258 

259 

260@api_bp.route("/renew-lock", methods=["POST"]) 

261@login_required 

262def renew_lock(): 

263 """Renew an existing lock on a resource.""" 

264 try: 

265 data = request.get_json() 

266 resource_uri = data.get("resource_uri") 

267 

268 if not resource_uri: 

269 return ( 

270 jsonify( 

271 {"status": "error", "message": gettext("No resource URI provided")} 

272 ), 

273 400, 

274 ) 

275 

276 # When renewing a lock, we don't need to check for linked resources again 

277 # Just pass an empty list as we're only refreshing the existing lock 

278 success = g.resource_lock_manager.acquire_lock(resource_uri, []) 

279 

280 if success: 

281 return jsonify({"status": "success"}) 

282 

283 return ( 

284 jsonify({"status": "error", "message": gettext("Unable to renew lock")}), 

285 423, 

286 ) 

287 

288 except Exception as e: 

289 current_app.logger.error(f"Error in renew_lock: {str(e)}") 

290 return ( 

291 jsonify( 

292 {"status": "error", "message": gettext("An unexpected error occurred")} 

293 ), 

294 500, 

295 ) 

296 

297 

298@api_bp.route("/validate-literal", methods=["POST"]) 

299@login_required 

300def validate_literal(): 

301 """Validate a literal value and suggest appropriate datatypes.""" 

302 value = request.json.get("value") 

303 if not value: 

304 return jsonify({"error": gettext("Value is required.")}), 400 

305 

306 matching_datatypes = [] 

307 for datatype, validation_func, _ in DATATYPE_MAPPING: 

308 if validation_func(value): 

309 matching_datatypes.append(str(datatype)) 

310 

311 if not matching_datatypes: 

312 return jsonify({"error": gettext("No matching datatypes found.")}), 400 

313 

314 return jsonify({"valid_datatypes": matching_datatypes}), 200 

315 

316 

317@api_bp.route("/check_orphans", methods=["POST"]) 

318@login_required 

319def check_orphans(): 

320 """ 

321 Check for orphaned entities and intermediate relations (proxies) that would result from the requested changes. 

322 Applies separate handling strategies for orphans and proxies, but returns a unified report. 

323 """ 

324 try: 

325 # Get strategies from configuration 

326 orphan_strategy = current_app.config.get( 

327 "ORPHAN_HANDLING_STRATEGY", OrphanHandlingStrategy.KEEP 

328 ) 

329 proxy_strategy = current_app.config.get( 

330 "PROXY_HANDLING_STRATEGY", ProxyHandlingStrategy.KEEP 

331 ) 

332 

333 data = request.json 

334 # Validate required fields 

335 if not data or "changes" not in data or "entity_type" not in data: 

336 return ( 

337 jsonify( 

338 { 

339 "status": "error", 

340 "error_type": "validation", 

341 "message": gettext( 

342 "Invalid request: 'changes' and 'entity_type' are required fields" 

343 ), 

344 } 

345 ), 

346 400, 

347 ) 

348 

349 changes = data.get("changes", []) 

350 entity_type = data.get("entity_type") 

351 entity_shape = data.get("entity_shape") 

352 custom_filter = get_custom_filter() 

353 

354 orphans = [] 

355 intermediate_orphans = [] 

356 

357 # Check for orphans and proxies based on their respective strategies 

358 check_for_orphans = orphan_strategy in ( 

359 OrphanHandlingStrategy.DELETE, 

360 OrphanHandlingStrategy.ASK, 

361 ) 

362 check_for_proxies = proxy_strategy in ( 

363 ProxyHandlingStrategy.DELETE, 

364 ProxyHandlingStrategy.ASK, 

365 ) 

366 if check_for_orphans or check_for_proxies: 

367 for change in changes: 

368 if change["action"] == "delete": 

369 found_orphans, found_intermediates = find_orphaned_entities( 

370 change["subject"], 

371 entity_type, 

372 change.get("predicate"), 

373 change.get("object"), 

374 ) 

375 # Only collect orphans if we need to handle them 

376 if check_for_orphans: 

377 orphans.extend(found_orphans) 

378 

379 # Only collect proxies if we need to handle them 

380 if check_for_proxies: 

381 intermediate_orphans.extend(found_intermediates) 

382 

383 # If both strategies are KEEP or no entities found, return empty result 

384 if (orphan_strategy == OrphanHandlingStrategy.KEEP or not orphans) and ( 

385 proxy_strategy == ProxyHandlingStrategy.KEEP or not intermediate_orphans 

386 ): 

387 return jsonify({"status": "success", "affected_entities": []}) 

388 

389 # Format entities for display 

390 def format_entities(entities, is_intermediate=False): 

391 return [ 

392 { 

393 "uri": entity["uri"], 

394 "label": custom_filter.human_readable_entity( 

395 entity["uri"], (entity["type"], entity_shape) 

396 ), 

397 "type": custom_filter.human_readable_class( 

398 (entity["type"], entity_shape) 

399 ), 

400 "is_intermediate": is_intermediate, 

401 } 

402 for entity in entities 

403 ] 

404 

405 # Create a unified list of affected entities 

406 affected_entities = format_entities(orphans) + format_entities( 

407 intermediate_orphans, is_intermediate=True 

408 ) 

409 

410 # Determine if we should automatically delete entities 

411 should_delete_orphans = orphan_strategy == OrphanHandlingStrategy.DELETE 

412 should_delete_proxies = proxy_strategy == ProxyHandlingStrategy.DELETE 

413 

414 # If both strategies are DELETE, we can automatically delete everything 

415 if should_delete_orphans and should_delete_proxies: 

416 return jsonify( 

417 { 

418 "status": "success", 

419 "affected_entities": affected_entities, 

420 "should_delete": True, 

421 "orphan_strategy": orphan_strategy.value, 

422 "proxy_strategy": proxy_strategy.value, 

423 } 

424 ) 

425 

426 # If at least one strategy is ASK, we need to ask the user 

427 return jsonify( 

428 { 

429 "status": "success", 

430 "affected_entities": affected_entities, 

431 "should_delete": False, 

432 "orphan_strategy": orphan_strategy.value, 

433 "proxy_strategy": proxy_strategy.value, 

434 } 

435 ) 

436 except ValueError as e: 

437 # Handle validation errors specifically 

438 error_message = str(e) 

439 current_app.logger.warning( 

440 f"Validation error in check_orphans: {error_message}" 

441 ) 

442 return ( 

443 jsonify( 

444 { 

445 "status": "error", 

446 "error_type": "validation", 

447 "message": gettext( 

448 "An error occurred while checking for orphaned entities" 

449 ), 

450 } 

451 ), 

452 400, 

453 ) 

454 except Exception as e: 

455 # Handle other errors 

456 error_message = f"Error checking orphans: {str(e)}" 

457 current_app.logger.error(f"{error_message}\n{traceback.format_exc()}") 

458 return ( 

459 jsonify( 

460 { 

461 "status": "error", 

462 "error_type": "system", 

463 "message": gettext( 

464 "An error occurred while checking for orphaned entities" 

465 ), 

466 } 

467 ), 

468 500, 

469 ) 

470 

471 

472@api_bp.route("/apply_changes", methods=["POST"]) 

473@login_required 

474def apply_changes(): 

475 """Apply changes to entities. 

476 

477 Request body: 

478 { 

479 "subject": (str) Main entity URI being modified, 

480 "changes": (list) List of changes to apply, 

481 "primary_source": (str) Primary source to use for provenance, 

482 "save_default_source": (bool) Whether to save primary_source as default for current user, 

483 "affected_entities": (list) Entities potentially affected by delete operations, 

484 "delete_affected": (bool) Whether to delete affected entities 

485 } 

486 

487 Responses: 

488 200 OK: Changes applied successfully 

489 400 Bad Request: Invalid request or validation error 

490 500 Internal Server Error: Server error while applying changes 

491 """ 

492 try: 

493 changes = request.get_json() 

494 

495 if not changes: 

496 return jsonify({"error": "No request data provided"}), 400 

497 

498 first_change = changes[0] if changes else {} 

499 subject = first_change.get("subject") 

500 affected_entities = first_change.get("affected_entities", []) 

501 delete_affected = first_change.get("delete_affected", False) 

502 primary_source = first_change.get("primary_source") 

503 save_default_source = first_change.get("save_default_source", False) 

504 

505 if primary_source and not validators.url(primary_source): 

506 return jsonify({"error": "Invalid primary source URL"}), 400 

507 

508 if save_default_source and primary_source and validators.url(primary_source): 

509 save_user_default_primary_source(current_user.orcid, primary_source) 

510 

511 deleted_entities = set() 

512 editor = Editor( 

513 get_dataset_endpoint(), 

514 get_provenance_endpoint(), 

515 current_app.config["COUNTER_HANDLER"], 

516 URIRef(f"https://orcid.org/{current_user.orcid}"), 

517 current_app.config["PRIMARY_SOURCE"], 

518 current_app.config["DATASET_GENERATION_TIME"], 

519 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

520 ) 

521 

522 if primary_source and validators.url(primary_source): 

523 editor.set_primary_source(primary_source) 

524 

525 has_entity_deletion = any( 

526 change["action"] == "delete" and not change.get("predicate") 

527 for change in changes 

528 ) 

529 

530 editor = import_entity_graph( 

531 editor, 

532 subject, 

533 include_referencing_entities=has_entity_deletion 

534 ) 

535 editor.preexisting_finished() 

536 

537 graph_uri = None 

538 if editor.dataset_is_quadstore: 

539 for quad in editor.g_set.quads((URIRef(subject), None, None, None)): 

540 graph_context = quad[3] 

541 graph_uri = get_graph_uri_from_context(graph_context) 

542 break 

543 

544 temp_id_to_uri = {} 

545 for change in changes: 

546 if change["action"] == "create": 

547 data = change.get("data") 

548 if data: 

549 subject = create_logic( 

550 editor, 

551 data, 

552 subject, 

553 graph_uri, 

554 temp_id_to_uri=temp_id_to_uri, 

555 parent_entity_type=None, 

556 ) 

557 

558 orphan_strategy = current_app.config.get( 

559 "ORPHAN_HANDLING_STRATEGY", OrphanHandlingStrategy.KEEP 

560 ) 

561 proxy_strategy = current_app.config.get( 

562 "PROXY_HANDLING_STRATEGY", ProxyHandlingStrategy.KEEP 

563 ) 

564 # Separiamo le operazioni di delete in due fasi: 

565 # 1. Prima eliminiamo tutte le entità orfane/intermedie 

566 # 2. Poi eliminiamo le triple specifiche 

567 

568 # Fase 1: Elimina le entità orfane/intermedie 

569 if affected_entities and delete_affected: 

570 # Separa gli orfani dalle entità proxy 

571 orphans = [entity for entity in affected_entities if not entity.get("is_intermediate")] 

572 proxies = [entity for entity in affected_entities if entity.get("is_intermediate")] 

573 

574 # Gestione degli orfani secondo la strategia per gli orfani 

575 should_delete_orphans = ( 

576 orphan_strategy == OrphanHandlingStrategy.DELETE 

577 or (orphan_strategy == OrphanHandlingStrategy.ASK and delete_affected) 

578 ) 

579 

580 if should_delete_orphans and orphans: 

581 for orphan in orphans: 

582 orphan_uri = orphan["uri"] 

583 if orphan_uri in deleted_entities: 

584 continue 

585 

586 delete_logic(editor, orphan_uri, graph_uri=graph_uri) 

587 deleted_entities.add(orphan_uri) 

588 

589 # Gestione delle entità proxy secondo la strategia per i proxy 

590 should_delete_proxies = ( 

591 proxy_strategy == ProxyHandlingStrategy.DELETE 

592 or (proxy_strategy == ProxyHandlingStrategy.ASK and delete_affected) 

593 ) 

594 

595 if should_delete_proxies and proxies: 

596 for proxy in proxies: 

597 proxy_uri = proxy["uri"] 

598 if proxy_uri in deleted_entities: 

599 continue 

600 

601 delete_logic(editor, proxy_uri, graph_uri=graph_uri) 

602 deleted_entities.add(proxy_uri) 

603 

604 # Fase 2: Processa tutte le altre modifiche 

605 for change in changes: 

606 if change["action"] == "delete": 

607 subject_uri = change["subject"] 

608 predicate = change.get("predicate") 

609 object_value = change.get("object") 

610 

611 # Se stiamo eliminando un'intera entità 

612 if not predicate: 

613 if subject_uri in deleted_entities: 

614 continue 

615 

616 delete_logic(editor, subject_uri, graph_uri=graph_uri, entity_type=change.get("entity_type")) 

617 deleted_entities.add(subject_uri) 

618 # Se stiamo eliminando una tripla specifica 

619 elif object_value: 

620 # Controlla se l'oggetto è un'entità che è già stata eliminata 

621 if object_value in deleted_entities: 

622 continue 

623 

624 delete_logic(editor, subject_uri, predicate, object_value, graph_uri, change.get("entity_type")) 

625 

626 # La gestione degli orfani e dei proxy è stata spostata all'inizio del ciclo 

627 

628 elif change["action"] == "update": 

629 update_logic( 

630 editor, 

631 change["subject"], 

632 change["predicate"], 

633 change["object"], 

634 change["newObject"], 

635 graph_uri, 

636 change.get("entity_type"), 

637 ) 

638 elif change["action"] == "order": 

639 order_logic( 

640 editor, 

641 change["subject"], 

642 change["predicate"], 

643 change["object"], 

644 change["newObject"], 

645 graph_uri, 

646 temp_id_to_uri, 

647 ) 

648 

649 try: 

650 editor.save() 

651 except ValueError as ve: 

652 # Re-raise ValueError so it can be caught by the outer try-except block 

653 current_app.logger.error(f"Error during save operation: {str(ve)}") 

654 raise 

655 except Exception as save_error: 

656 current_app.logger.error(f"Error during save operation: {str(save_error)}") 

657 return jsonify( 

658 { 

659 "status": "error", 

660 "error_type": "database", 

661 "message": gettext("Failed to save changes to the database: {}").format(str(save_error)), 

662 } 

663 ), 500 

664 

665 return ( 

666 jsonify( 

667 { 

668 "status": "success", 

669 "message": gettext("Changes applied successfully"), 

670 } 

671 ), 

672 200, 

673 ) 

674 

675 except ValueError as e: 

676 # Handle validation errors specifically 

677 error_message = str(e) 

678 current_app.logger.warning(f"Validation error: {error_message}") 

679 return ( 

680 jsonify( 

681 { 

682 "status": "error", 

683 "error_type": "validation", 

684 "message": error_message, 

685 } 

686 ), 

687 400, 

688 ) 

689 except Exception as e: 

690 # Handle other errors 

691 error_message = ( 

692 f"Error while applying changes: {str(e)}\n{traceback.format_exc()}" 

693 ) 

694 current_app.logger.error(error_message) 

695 return ( 

696 jsonify( 

697 { 

698 "status": "error", 

699 "error_type": "system", 

700 "message": gettext("An error occurred while applying changes"), 

701 } 

702 ), 

703 500, 

704 ) 

705 

706 

707def get_graph_uri_from_context(graph_context): 

708 """Extract the graph URI from a graph context. 

709  

710 Args: 

711 graph_context: Either a Graph object or a direct URI reference 

712  

713 Returns: 

714 The graph URI 

715 """ 

716 if isinstance(graph_context, Graph): 

717 return graph_context.identifier 

718 else: 

719 return graph_context 

720 

721 

722def determine_datatype(value, datatype_uris): 

723 for datatype_uri in datatype_uris: 

724 validation_func = next( 

725 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None 

726 ) 

727 if validation_func and validation_func(value): 

728 return URIRef(datatype_uri) 

729 # If none match, default to XSD.string 

730 return XSD.string 

731 

732 

733def create_logic( 

734 editor: Editor, 

735 data: Dict[str, dict], 

736 subject=None, 

737 graph_uri=None, 

738 parent_subject=None, 

739 parent_predicate=None, 

740 temp_id_to_uri=None, 

741 parent_entity_type=None, 

742): 

743 entity_type = data.get("entity_type") 

744 properties = data.get("properties", {}) 

745 temp_id = data.get("tempId") 

746 

747 if subject is None: 

748 subject = generate_unique_uri(entity_type) 

749 

750 if temp_id and temp_id_to_uri is not None: 

751 temp_id_to_uri[temp_id] = str(subject) 

752 

753 # Create the entity type using validate_new_triple 

754 if parent_subject is not None: 

755 type_value, _, error_message = validate_new_triple( 

756 subject, RDF.type, entity_type, "create", entity_types=entity_type 

757 ) 

758 if error_message: 

759 raise ValueError(error_message) 

760 

761 if type_value is not None: 

762 editor.create(URIRef(subject), RDF.type, type_value, graph_uri) 

763 

764 # Create the relationship to the parent using validate_new_triple 

765 if parent_subject and parent_predicate: 

766 # When creating a relationship, we need to validate that the parent can have this relationship 

767 # with an entity of our type. Pass our entity_type as the object_entity_type for validation 

768 parent_value, _, error_message = validate_new_triple( 

769 parent_subject, 

770 parent_predicate, 

771 subject, 

772 "create", 

773 entity_types=parent_entity_type, 

774 ) 

775 if error_message: 

776 raise ValueError(error_message) 

777 

778 if parent_value is not None: 

779 editor.create( 

780 URIRef(parent_subject), 

781 URIRef(parent_predicate), 

782 parent_value, 

783 graph_uri, 

784 ) 

785 

786 for predicate, values in properties.items(): 

787 if not isinstance(values, list): 

788 values = [values] 

789 for value in values: 

790 if isinstance(value, dict) and "entity_type" in value: 

791 # For nested entities, create them first 

792 nested_subject = generate_unique_uri(value["entity_type"]) 

793 create_logic( 

794 editor, 

795 value, 

796 nested_subject, 

797 graph_uri, 

798 subject, 

799 predicate, 

800 temp_id_to_uri, 

801 parent_entity_type=entity_type, # Pass the current entity type as parent_entity_type 

802 ) 

803 else: 

804 # Use validate_new_triple to validate and get the correctly typed value 

805 object_value, _, error_message = validate_new_triple( 

806 subject, predicate, value, "create", entity_types=entity_type 

807 ) 

808 if error_message: 

809 raise ValueError(error_message) 

810 

811 if object_value is not None: 

812 editor.create( 

813 URIRef(subject), URIRef(predicate), object_value, graph_uri 

814 ) 

815 

816 return subject 

817 

818 

819def update_logic( 

820 editor: Editor, 

821 subject, 

822 predicate, 

823 old_value, 

824 new_value, 

825 graph_uri=None, 

826 entity_type=None, 

827): 

828 new_value, old_value, error_message = validate_new_triple( 

829 subject, predicate, new_value, "update", old_value, entity_types=entity_type 

830 ) 

831 if error_message: 

832 raise ValueError(error_message) 

833 

834 editor.update(URIRef(subject), URIRef(predicate), old_value, new_value, graph_uri) 

835 

836 

837def rebuild_entity_order( 

838 editor: Editor, 

839 ordered_by_uri: URIRef, 

840 entities: list, 

841 graph_uri=None 

842): 

843 """ 

844 Rebuild the ordering chain for a list of entities. 

845  

846 Args: 

847 editor: The editor instance 

848 ordered_by_uri: The property used for ordering 

849 entities: List of entities to be ordered 

850 graph_uri: Optional graph URI 

851 """ 

852 # First, remove all existing ordering relationships 

853 for entity in entities: 

854 for s, p, o in list(editor.g_set.triples((entity, ordered_by_uri, None))): 

855 editor.delete(entity, ordered_by_uri, o, graph_uri) 

856 

857 # Then rebuild the chain with the entities 

858 for i in range(len(entities) - 1): 

859 current_entity = entities[i] 

860 next_entity = entities[i + 1] 

861 editor.create(current_entity, ordered_by_uri, next_entity, graph_uri) 

862 

863 return editor 

864 

865 

866def delete_logic( 

867 editor: Editor, 

868 subject, 

869 predicate=None, 

870 object_value=None, 

871 graph_uri=None, 

872 entity_type=None, 

873): 

874 # Ensure we have the correct data types for all values 

875 subject_uri = URIRef(subject) 

876 predicate_uri = URIRef(predicate) if predicate else None 

877 

878 # Validate and get correctly typed object value if we have a predicate 

879 if predicate and object_value: 

880 # Use validate_new_triple to validate the deletion and get the correctly typed object 

881 _, object_value, error_message = validate_new_triple( 

882 subject, predicate, None, "delete", object_value, entity_types=entity_type 

883 ) 

884 if error_message: 

885 raise ValueError(error_message) 

886 

887 editor.delete(subject_uri, predicate_uri, object_value, graph_uri) 

888 

889 

890def order_logic( 

891 editor: Editor, 

892 subject, 

893 predicate, 

894 new_order, 

895 ordered_by, 

896 graph_uri=None, 

897 temp_id_to_uri: Optional[Dict] = None, 

898): 

899 subject_uri = URIRef(subject) 

900 predicate_uri = URIRef(predicate) 

901 ordered_by_uri = URIRef(ordered_by) 

902 # Ottieni tutte le entità ordinate attuali direttamente dall'editor 

903 current_entities = [ 

904 o for _, _, o in editor.g_set.triples((subject_uri, predicate_uri, None)) 

905 ] 

906 

907 # Dizionario per mappare le vecchie entità alle nuove 

908 old_to_new_mapping = {} 

909 

910 # Per ogni entità attuale 

911 for old_entity in current_entities: 

912 if str(old_entity) in new_order: # Processa solo le entità preesistenti 

913 # Memorizza tutte le proprietà dell'entità attuale 

914 entity_properties = list(editor.g_set.triples((old_entity, None, None))) 

915 

916 entity_type = next( 

917 (o for _, p, o in entity_properties if p == RDF.type), None 

918 ) 

919 

920 if entity_type is None: 

921 raise ValueError( 

922 f"Impossibile determinare il tipo dell'entità per {old_entity}" 

923 ) 

924 

925 # Crea una nuova entità 

926 new_entity_uri = generate_unique_uri(entity_type) 

927 old_to_new_mapping[old_entity] = new_entity_uri 

928 

929 # Cancella la vecchia entità 

930 editor.delete(subject_uri, predicate_uri, old_entity, graph_uri) 

931 editor.delete(old_entity, graph=graph_uri) 

932 

933 # Ricrea il collegamento tra il soggetto principale e la nuova entità 

934 editor.create(subject_uri, predicate_uri, new_entity_uri, graph_uri) 

935 

936 # Ripristina tutte le altre proprietà per la nuova entità 

937 for _, p, o in entity_properties: 

938 if p != predicate_uri and p != ordered_by_uri: 

939 editor.create(new_entity_uri, p, o, graph_uri) 

940 

941 # Prepara la lista delle entità nel nuovo ordine 

942 ordered_entities = [] 

943 for entity in new_order: 

944 new_entity_uri = old_to_new_mapping.get(URIRef(entity)) 

945 if not new_entity_uri: 

946 new_entity_uri = URIRef(temp_id_to_uri.get(entity, entity)) 

947 ordered_entities.append(new_entity_uri) 

948 

949 # Ricostruisci l'ordine 

950 if ordered_entities: 

951 rebuild_entity_order(editor, ordered_by_uri, ordered_entities, graph_uri) 

952 

953 return editor 

954 

955 

956@api_bp.route("/human-readable-entity", methods=["POST"]) 

957@login_required 

958def get_human_readable_entity(): 

959 custom_filter = get_custom_filter() 

960 

961 # Check if required parameters are present 

962 if "uri" not in request.form or "entity_class" not in request.form: 

963 return jsonify({"status": "error", "message": "Missing required parameters"}), 400 

964 

965 uri = request.form["uri"] 

966 entity_class = request.form["entity_class"] 

967 shape = determine_shape_for_classes([entity_class]) 

968 filter_instance = custom_filter 

969 readable = filter_instance.human_readable_entity(uri, (entity_class, shape)) 

970 return readable 

971 

972 

973@api_bp.route('/format-source', methods=['POST']) 

974@login_required 

975def format_source_api(): 

976 """ 

977 API endpoint to format a source URL using the application's filters. 

978 Accepts POST request with JSON body: {"url": "source_url"} 

979 Returns JSON: {"formatted_html": "html_string"} 

980 """ 

981 data = request.get_json() 

982 source_url = data.get('url') 

983 

984 if not source_url or not validators.url(source_url): 

985 return jsonify({"error": gettext("Invalid or missing URL")}), 400 

986 

987 try: 

988 custom_filter = get_custom_filter() 

989 formatted_html = custom_filter.format_source_reference(source_url) 

990 return jsonify({"formatted_html": formatted_html}) 

991 except Exception as e: 

992 current_app.logger.error(f"Error formatting source URL '{source_url}': {e}") 

993 fallback_html = f'<a href="{source_url}" target="_blank">{source_url}</a>' 

994 return jsonify({"formatted_html": fallback_html})