Coverage for heritrace/routes/entity.py: 89%

776 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-10-13 17:12 +0000

1import json 

2import re 

3from datetime import datetime 

4from typing import List, Optional, Tuple 

5 

6import validators 

7from flask import (Blueprint, abort, current_app, flash, jsonify, redirect, 

8 render_template, request, url_for) 

9from flask_babel import gettext 

10from flask_login import current_user, login_required 

11from rdflib import RDF, XSD, ConjunctiveGraph, Graph, Literal, URIRef 

12from SPARQLWrapper import JSON 

13from time_agnostic_library.agnostic_entity import AgnosticEntity 

14 

15from heritrace.apis.orcid import get_responsible_agent_uri 

16from heritrace.editor import Editor 

17from heritrace.extensions import (get_change_tracking_config, 

18 get_custom_filter, get_dataset_endpoint, 

19 get_dataset_is_quadstore, get_display_rules, 

20 get_form_fields, get_provenance_endpoint, 

21 get_provenance_sparql, get_shacl_graph, 

22 get_sparql) 

23from heritrace.forms import * 

24from heritrace.utils.converters import convert_to_datetime 

25from heritrace.utils.datatypes import DATATYPE_MAPPING, get_datatype_options 

26from heritrace.utils.display_rules_utils import ( 

27 get_class_priority, get_grouped_triples, get_highest_priority_class, 

28 get_predicate_ordering_info, get_property_order_from_rules, 

29 get_shape_order_from_display_rules, is_entity_type_visible) 

30from heritrace.utils.filters import Filter 

31from heritrace.utils.primary_source_utils import ( 

32 get_default_primary_source, save_user_default_primary_source) 

33from heritrace.utils.shacl_utils import (determine_shape_for_entity_triples, 

34 find_matching_form_field, 

35 get_entity_position_in_sequence) 

36from heritrace.utils.shacl_validation import get_valid_predicates 

37from heritrace.utils.sparql_utils import ( 

38 determine_shape_for_classes, fetch_current_state_with_related_entities, 

39 fetch_data_graph_for_subject, get_entity_types, import_referenced_entities, 

40 parse_sparql_update) 

41from heritrace.utils.uri_utils import generate_unique_uri 

42from heritrace.utils.virtual_properties import \ 

43 get_virtual_properties_for_entity, \ 

44 transform_entity_creation_with_virtual_properties, \ 

45 remove_virtual_properties_from_creation_data 

46 

47def _prepare_entity_creation_data(structured_data): 

48 """ 

49 Prepare entity creation data by removing virtual properties and extracting core fields. 

50 

51 Returns: 

52 Tuple of (cleaned_structured_data, entity_type, properties, entity_uri) 

53 """ 

54 cleaned_structured_data = remove_virtual_properties_from_creation_data(structured_data) 

55 entity_type = cleaned_structured_data.get("entity_type") 

56 properties = cleaned_structured_data.get("properties", {}) 

57 entity_uri = generate_unique_uri(entity_type) 

58 

59 return cleaned_structured_data, entity_type, properties, entity_uri 

60 

61 

62def _setup_editor_for_creation(editor, cleaned_structured_data): 

63 """ 

64 Setup editor for entity creation with referenced entities and preprocessing. 

65 """ 

66 import_referenced_entities(editor, cleaned_structured_data) 

67 editor.preexisting_finished() 

68 

69 

70def _process_virtual_properties_after_creation(editor, structured_data, entity_uri, default_graph_uri): 

71 """ 

72 Process virtual properties after main entity creation. 

73 """ 

74 virtual_entities = transform_entity_creation_with_virtual_properties(structured_data, str(entity_uri)) 

75 

76 if virtual_entities: 

77 for virtual_entity in virtual_entities: 

78 virtual_entity_uri = generate_unique_uri(virtual_entity.get("entity_type")) 

79 create_nested_entity(editor, virtual_entity_uri, virtual_entity, default_graph_uri) 

80 

81 # Save the virtual entities 

82 editor.save() 

83 

84 

85entity_bp = Blueprint("entity", __name__) 

86 

87 

88def get_deleted_entity_context_info(is_deleted: bool, sorted_timestamps: List[str], 

89 history: dict, subject: str) -> Tuple[Optional[Graph], Optional[str], Optional[str]]: 

90 """ 

91 Extract context information for deleted entities with multiple timestamps. 

92  

93 When an entity is deleted but has multiple timestamps in its history, 

94 this function retrieves the context snapshot from the second-to-last timestamp 

95 and determines the entity's highest priority class and shape. 

96  

97 Args: 

98 is_deleted: Whether the entity is deleted 

99 sorted_timestamps: List of timestamps in chronological order 

100 history: Dictionary mapping subject -> timestamp -> Graph 

101 subject: The entity URI as string 

102  

103 Returns: 

104 Tuple of (context_snapshot, highest_priority_class, entity_shape) 

105 Returns (None, None, None) if conditions are not met 

106 """ 

107 if is_deleted and len(sorted_timestamps) > 1: 

108 context_snapshot = history[subject][sorted_timestamps[-2]] 

109 

110 subject_classes = [ 

111 o 

112 for _, _, o in context_snapshot.triples( 

113 (URIRef(subject), RDF.type, None) 

114 ) 

115 ] 

116 

117 highest_priority_class = get_highest_priority_class(subject_classes) 

118 entity_shape = determine_shape_for_entity_triples( 

119 list(context_snapshot.triples((URIRef(subject), None, None))) 

120 ) 

121 

122 return context_snapshot, highest_priority_class, entity_shape 

123 else: 

124 return None, None, None 

125 

126 

127@entity_bp.route("/about/<path:subject>") 

128@login_required 

129def about(subject): 

130 """ 

131 Display detailed information about an entity. 

132 

133 Args: 

134 subject: URI of the entity to display 

135 """ 

136 change_tracking_config = get_change_tracking_config() 

137 

138 default_primary_source = get_default_primary_source(current_user.orcid) 

139 

140 agnostic_entity = AgnosticEntity( 

141 res=subject, config=change_tracking_config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False 

142 ) 

143 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

144 

145 is_deleted = False 

146 context_snapshot = None 

147 subject_classes = [] 

148 highest_priority_class = None 

149 entity_shape = None 

150 

151 if history.get(subject): 

152 sorted_timestamps = sorted(history[subject].keys()) 

153 latest_metadata = next( 

154 ( 

155 meta 

156 for _, meta in provenance[subject].items() 

157 if meta["generatedAtTime"] == sorted_timestamps[-1] 

158 ), 

159 None, 

160 ) 

161 

162 is_deleted = ( 

163 latest_metadata 

164 and "invalidatedAtTime" in latest_metadata 

165 and latest_metadata["invalidatedAtTime"] 

166 ) 

167 

168 context_snapshot, highest_priority_class, entity_shape = get_deleted_entity_context_info( 

169 is_deleted, sorted_timestamps, history, subject 

170 ) 

171 

172 grouped_triples = {} 

173 can_be_added = [] 

174 can_be_deleted = [] 

175 datatypes = {} 

176 mandatory_values = {} 

177 optional_values = {} 

178 valid_predicates = [] 

179 data_graph = None 

180 

181 if not is_deleted: 

182 data_graph = fetch_data_graph_for_subject(subject) 

183 

184 # Check if entity exists - if no history and no data_graph, entity doesn't exist 

185 if not history.get(subject) and (not data_graph or len(data_graph) == 0): 

186 abort(404) 

187 

188 if data_graph: 

189 triples = list(data_graph.triples((None, None, None))) 

190 subject_classes = [o for s, p, o in data_graph.triples((URIRef(subject), RDF.type, None))] 

191 

192 highest_priority_class = get_highest_priority_class(subject_classes) 

193 entity_shape = determine_shape_for_entity_triples( 

194 list(data_graph.triples((URIRef(subject), None, None))) 

195 ) 

196 

197 ( 

198 can_be_added, 

199 can_be_deleted, 

200 datatypes, 

201 mandatory_values, 

202 optional_values, 

203 valid_predicates, 

204 ) = get_valid_predicates(triples, highest_priority_class=highest_priority_class) 

205 

206 grouped_triples, relevant_properties = get_grouped_triples( 

207 subject, triples, valid_predicates, highest_priority_class=highest_priority_class, highest_priority_shape=entity_shape 

208 ) 

209 

210 virtual_properties = get_virtual_properties_for_entity(highest_priority_class, entity_shape) 

211 

212 can_be_added = [uri for uri in can_be_added if uri in relevant_properties] + [vp[0] for vp in virtual_properties] 

213 can_be_deleted = [ 

214 uri for uri in can_be_deleted if uri in relevant_properties 

215 ] + [vp[0] for vp in virtual_properties] 

216 

217 update_form = UpdateTripleForm() 

218 

219 form_fields = get_form_fields() 

220 

221 datatype_options = get_datatype_options() 

222 

223 predicate_details_map = {} 

224 for entity_type_key, predicates in form_fields.items(): 

225 for predicate_uri, details_list in predicates.items(): 

226 for details in details_list: 

227 shape = details.get("nodeShape") 

228 key = (predicate_uri, entity_type_key, shape) 

229 predicate_details_map[key] = details 

230 

231 return render_template( 

232 "entity/about.jinja", 

233 subject=subject, 

234 history=history, 

235 can_be_added=can_be_added, 

236 can_be_deleted=can_be_deleted, 

237 datatypes=datatypes, 

238 update_form=update_form, 

239 mandatory_values=mandatory_values, 

240 optional_values=optional_values, 

241 shacl=bool(len(get_shacl_graph())), 

242 grouped_triples=grouped_triples, 

243 display_rules=get_display_rules(), 

244 form_fields=form_fields, 

245 entity_type=highest_priority_class, 

246 entity_shape=entity_shape, 

247 predicate_details_map=predicate_details_map, 

248 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"], 

249 dataset_db_text_index_enabled=current_app.config[ 

250 "DATASET_DB_TEXT_INDEX_ENABLED" 

251 ], 

252 is_deleted=is_deleted, 

253 context=context_snapshot, 

254 default_primary_source=default_primary_source, 

255 datatype_options=datatype_options, 

256 ) 

257 

258 

259@entity_bp.route("/create-entity", methods=["GET", "POST"]) 

260@login_required 

261def create_entity(): 

262 """ 

263 Create a new entity in the dataset. 

264 """ 

265 form_fields = get_form_fields() 

266 

267 default_primary_source = get_default_primary_source(current_user.orcid) 

268 

269 entity_class_shape_pairs = sorted( 

270 [ 

271 entity_key 

272 for entity_key in form_fields.keys() 

273 if is_entity_type_visible(entity_key) 

274 ], 

275 key=lambda et: get_class_priority(et), 

276 reverse=True, 

277 ) 

278 

279 datatype_options = get_datatype_options() 

280 

281 if request.method == "POST": 

282 structured_data = json.loads(request.form.get("structured_data", "{}")) 

283 primary_source = request.form.get("primary_source") or None 

284 save_default_source = request.form.get("save_default_source") == 'true' 

285 

286 if primary_source and not validators.url(primary_source): 

287 return jsonify({"status": "error", "errors": [gettext("Invalid primary source URL provided")]}), 400 

288 

289 if save_default_source and primary_source and validators.url(primary_source): 

290 save_user_default_primary_source(current_user.orcid, primary_source) 

291 

292 editor = Editor( 

293 get_dataset_endpoint(), 

294 get_provenance_endpoint(), 

295 current_app.config["COUNTER_HANDLER"], 

296 URIRef(get_responsible_agent_uri(current_user.orcid)), 

297 primary_source, 

298 current_app.config["DATASET_GENERATION_TIME"], 

299 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

300 ) 

301 

302 if not structured_data.get("entity_type"): 

303 return jsonify({"status": "error", "errors": [gettext("Entity type is required")]}), 400 

304 

305 # Prepare common data for entity creation 

306 cleaned_structured_data, entity_type, properties, entity_uri = _prepare_entity_creation_data(structured_data) 

307 

308 default_graph_uri = ( 

309 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None 

310 ) 

311 

312 if form_fields: 

313 validation_errors = validate_entity_data(cleaned_structured_data) 

314 if validation_errors: 

315 return jsonify({"status": "error", "errors": validation_errors}), 400 

316 

317 _setup_editor_for_creation(editor, cleaned_structured_data) 

318 

319 for predicate, values in properties.items(): 

320 if not isinstance(values, list): 

321 values = [values] 

322 

323 entity_shape = cleaned_structured_data.get("entity_shape") 

324 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

325 

326 field_definitions = form_fields.get(matching_key, {}).get(predicate, []) if matching_key else [] 

327 

328 # Get the shape from the property value if available 

329 property_shape = None 

330 if values and isinstance(values[0], dict): 

331 property_shape = values[0].get("shape") 

332 

333 # Filter field definitions to find the matching one based on shape 

334 matching_field_def = None 

335 for field_def in field_definitions: 

336 if property_shape: 

337 # If property has a shape, match it with the field definition's subjectShape 

338 if field_def.get("subjectShape") == property_shape: 

339 matching_field_def = field_def 

340 break 

341 else: 

342 # If no shape specified, use the first field definition without a shape requirement 

343 if not field_def.get("subjectShape"): 

344 matching_field_def = field_def 

345 break 

346 

347 # If no matching field definition found, use the first one (default behavior) 

348 if not matching_field_def and field_definitions: 

349 matching_field_def = field_definitions[0] 

350 

351 ordered_by = ( 

352 matching_field_def.get("orderedBy") if matching_field_def else None 

353 ) 

354 

355 if ordered_by: 

356 process_ordered_properties( 

357 editor, entity_uri, predicate, values, default_graph_uri, ordered_by 

358 ) 

359 else: 

360 # Handle unordered properties 

361 process_unordered_properties( 

362 editor, entity_uri, predicate, values, default_graph_uri, matching_field_def 

363 ) 

364 else: 

365 editor.import_entity(entity_uri) 

366 _setup_editor_for_creation(editor, cleaned_structured_data) 

367 

368 editor.create( 

369 entity_uri, 

370 RDF.type, 

371 URIRef(entity_type), 

372 default_graph_uri, 

373 ) 

374 

375 for predicate, values in properties.items(): 

376 for value_dict in values: 

377 if value_dict["type"] == "uri": 

378 editor.create( 

379 entity_uri, 

380 URIRef(predicate), 

381 URIRef(value_dict["value"]), 

382 default_graph_uri, 

383 ) 

384 elif value_dict["type"] == "literal": 

385 datatype = ( 

386 URIRef(value_dict["datatype"]) 

387 if "datatype" in value_dict 

388 else XSD.string 

389 ) 

390 editor.create( 

391 entity_uri, 

392 URIRef(predicate), 

393 Literal(value_dict["value"], datatype=datatype), 

394 default_graph_uri, 

395 ) 

396 

397 try: 

398 # Save the main entity first 

399 editor.save() 

400 

401 # Process virtual properties after creation 

402 _process_virtual_properties_after_creation(editor, structured_data, entity_uri, default_graph_uri) 

403 

404 response = jsonify( 

405 { 

406 "status": "success", 

407 "redirect_url": url_for("entity.about", subject=str(entity_uri)), 

408 } 

409 ) 

410 flash(gettext("Entity created successfully"), "success") 

411 return response, 200 

412 except Exception as e: 

413 error_message = gettext( 

414 "An error occurred while creating the entity: %(error)s", error=str(e) 

415 ) 

416 return jsonify({"status": "error", "errors": [error_message]}), 500 

417 

418 return render_template( 

419 "create_entity.jinja", 

420 datatype_options=datatype_options, 

421 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"], 

422 dataset_db_text_index_enabled=current_app.config[ 

423 "DATASET_DB_TEXT_INDEX_ENABLED" 

424 ], 

425 default_primary_source=default_primary_source, 

426 shacl=bool(form_fields), 

427 entity_class_shape_pairs=entity_class_shape_pairs 

428 ) 

429 

430 

431def create_nested_entity( 

432 editor: Editor, entity_uri, entity_data, graph_uri=None 

433): 

434 form_fields = get_form_fields() 

435 

436 editor.create( 

437 entity_uri, 

438 URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), 

439 URIRef(entity_data["entity_type"]), 

440 graph_uri, 

441 ) 

442 

443 entity_type = entity_data.get("entity_type") 

444 entity_shape = entity_data.get("entity_shape") 

445 properties = entity_data.get("properties", {}) 

446 

447 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

448 

449 if not matching_key: 

450 return 

451 

452 # Add other properties 

453 for predicate, values in properties.items(): 

454 if not isinstance(values, list): 

455 values = [values] 

456 field_definitions = form_fields[matching_key].get(predicate, []) 

457 

458 for value in values: 

459 if isinstance(value, dict) and "entity_type" in value: 

460 if "intermediateRelation" in value: 

461 intermediate_uri = generate_unique_uri( 

462 value["intermediateRelation"]["class"] 

463 ) 

464 target_uri = generate_unique_uri(value["entity_type"]) 

465 editor.create( 

466 entity_uri, URIRef(predicate), intermediate_uri, graph_uri 

467 ) 

468 editor.create( 

469 intermediate_uri, 

470 URIRef(value["intermediateRelation"]["property"]), 

471 target_uri, 

472 graph_uri, 

473 ) 

474 create_nested_entity( 

475 editor, target_uri, value, graph_uri 

476 ) 

477 else: 

478 # Handle nested entities 

479 nested_uri = generate_unique_uri(value["entity_type"]) 

480 editor.create(entity_uri, URIRef(predicate), nested_uri, graph_uri) 

481 create_nested_entity( 

482 editor, nested_uri, value, graph_uri 

483 ) 

484 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

485 existing_entity_uri = value.get("entity_uri") 

486 if existing_entity_uri: 

487 editor.create(entity_uri, URIRef(predicate), URIRef(existing_entity_uri), graph_uri) 

488 else: 

489 # Handle simple properties - check if it's a URI or literal 

490 if validators.url(str(value)): 

491 object_value = URIRef(value) 

492 else: 

493 datatype = XSD.string # Default to string if not specified 

494 datatype_uris = [] 

495 if field_definitions: 

496 datatype_uris = field_definitions[0].get("datatypes", []) 

497 datatype = determine_datatype(value, datatype_uris) 

498 object_value = Literal(value, datatype=datatype) 

499 editor.create(entity_uri, URIRef(predicate), object_value, graph_uri) 

500 

501 

502def process_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri, matching_field_def): 

503 """ 

504 Process a single entity value, handling nested entities, existing entity references, and simple literals. 

505  

506 Args: 

507 editor: Editor instance for RDF operations 

508 entity_uri: URI of the parent entity 

509 predicate: Predicate URI 

510 value: Value to process (dict or primitive) 

511 default_graph_uri: Default graph URI for quad stores 

512 matching_field_def: Field definition for datatype validation 

513  

514 Returns: 

515 URIRef: The URI of the created/referenced entity 

516 """ 

517 if isinstance(value, dict) and "entity_type" in value: 

518 nested_uri = generate_unique_uri(value["entity_type"]) 

519 editor.create( 

520 entity_uri, 

521 URIRef(predicate), 

522 nested_uri, 

523 default_graph_uri, 

524 ) 

525 create_nested_entity( 

526 editor, 

527 nested_uri, 

528 value, 

529 default_graph_uri 

530 ) 

531 return nested_uri 

532 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

533 entity_ref_uri = value.get("entity_uri") 

534 if entity_ref_uri: 

535 object_value = URIRef(entity_ref_uri) 

536 editor.create( 

537 entity_uri, 

538 URIRef(predicate), 

539 object_value, 

540 default_graph_uri, 

541 ) 

542 return object_value 

543 else: 

544 raise ValueError("Missing entity_uri in existing entity reference") 

545 else: 

546 # Handle simple properties - check if it's a URI or literal 

547 if validators.url(str(value)): 

548 object_value = URIRef(value) 

549 else: 

550 datatype_uris = [] 

551 if matching_field_def: 

552 datatype_uris = matching_field_def.get("datatypes", []) 

553 datatype = determine_datatype(value, datatype_uris) 

554 object_value = Literal(value, datatype=datatype) 

555 editor.create( 

556 entity_uri, 

557 URIRef(predicate), 

558 object_value, 

559 default_graph_uri, 

560 ) 

561 return object_value 

562 

563 

564def process_ordered_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri): 

565 """ 

566 Process a single entity value for ordered properties. 

567  

568 Args: 

569 editor: Editor instance for RDF operations 

570 entity_uri: URI of the parent entity 

571 predicate: Predicate URI 

572 value: Value to process (dict) 

573 default_graph_uri: Default graph URI for quad stores 

574  

575 Returns: 

576 URIRef: The URI of the created/referenced entity 

577 """ 

578 if isinstance(value, dict) and "entity_type" in value: 

579 nested_uri = generate_unique_uri(value["entity_type"]) 

580 editor.create( 

581 entity_uri, 

582 URIRef(predicate), 

583 nested_uri, 

584 default_graph_uri, 

585 ) 

586 create_nested_entity( 

587 editor, 

588 nested_uri, 

589 value, 

590 default_graph_uri 

591 ) 

592 return nested_uri 

593 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

594 # If it's a direct URI value (reference to existing entity) 

595 nested_uri = URIRef(value) 

596 editor.create( 

597 entity_uri, 

598 URIRef(predicate), 

599 nested_uri, 

600 default_graph_uri, 

601 ) 

602 return nested_uri 

603 else: 

604 raise ValueError("Unexpected value type for ordered property") 

605 

606 

607def process_ordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, ordered_by): 

608 """ 

609 Process ordered properties by grouping values by shape and maintaining order. 

610  

611 Args: 

612 editor: Editor instance for RDF operations 

613 entity_uri: URI of the parent entity 

614 predicate: Predicate URI 

615 values: List of values to process 

616 default_graph_uri: Default graph URI for quad stores 

617 ordered_by: URI of the ordering property 

618 """ 

619 values_by_shape = {} 

620 for value in values: 

621 shape = value.get("entity_shape") 

622 if not shape: 

623 shape = "default_shape" 

624 if shape not in values_by_shape: 

625 values_by_shape[shape] = [] 

626 values_by_shape[shape].append(value) 

627 

628 for shape, shape_values in values_by_shape.items(): 

629 previous_entity = None 

630 for value in shape_values: 

631 nested_uri = process_ordered_entity_value( 

632 editor, entity_uri, predicate, value, default_graph_uri 

633 ) 

634 

635 if previous_entity: 

636 editor.create( 

637 previous_entity, 

638 URIRef(ordered_by), 

639 nested_uri, 

640 default_graph_uri, 

641 ) 

642 previous_entity = nested_uri 

643 

644 

645def process_unordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, matching_field_def): 

646 """ 

647 Process unordered properties. 

648  

649 Args: 

650 editor: Editor instance for RDF operations 

651 entity_uri: URI of the parent entity 

652 predicate: Predicate URI 

653 values: List of values to process 

654 default_graph_uri: Default graph URI for quad stores 

655 matching_field_def: Field definition for datatype validation 

656 """ 

657 for value in values: 

658 process_entity_value( 

659 editor, entity_uri, predicate, value, default_graph_uri, matching_field_def 

660 ) 

661 

662 

663def determine_datatype(value, datatype_uris): 

664 for datatype_uri in datatype_uris: 

665 validation_func = next( 

666 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None 

667 ) 

668 if validation_func and validation_func(value): 

669 return URIRef(datatype_uri) 

670 # If none match, default to XSD.string 

671 return XSD.string 

672 

673 

674def validate_entity_data(structured_data): 

675 """ 

676 Validates entity data against form field definitions, considering shape matching. 

677 

678 Args: 

679 structured_data (dict): Data to validate containing entity_type and properties 

680 

681 Returns: 

682 list: List of validation error messages, empty if validation passes 

683 """ 

684 custom_filter = get_custom_filter() 

685 form_fields = get_form_fields() 

686 

687 errors = [] 

688 entity_type = structured_data.get("entity_type") 

689 entity_shape = structured_data.get("entity_shape") 

690 

691 if not entity_type: 

692 errors.append(gettext("Entity type is required")) 

693 return errors 

694 

695 entity_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

696 

697 if not entity_key: 

698 errors.append(f"No form fields found for entity type: {entity_type}" + 

699 (f" and shape: {entity_shape}" if entity_shape else "")) 

700 return errors 

701 

702 entity_fields = form_fields[entity_key] 

703 properties = structured_data.get("properties", {}) 

704 

705 for prop_uri, prop_values in properties.items(): 

706 if URIRef(prop_uri) == RDF.type: 

707 continue 

708 

709 field_definitions = entity_fields.get(prop_uri) 

710 if not field_definitions: 

711 errors.append( 

712 gettext( 

713 "Unknown property %(prop_uri)s for entity type %(entity_type)s", 

714 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

715 entity_type=custom_filter.human_readable_class(entity_key), 

716 ) 

717 ) 

718 continue 

719 

720 if not isinstance(prop_values, list): 

721 prop_values = [prop_values] 

722 

723 property_shape = None 

724 if prop_values and isinstance(prop_values[0], dict): 

725 property_shape = prop_values[0].get("shape") 

726 

727 matching_field_def = None 

728 for field_def in field_definitions: 

729 if property_shape: 

730 if field_def.get("subjectShape") == property_shape: 

731 matching_field_def = field_def 

732 break 

733 else: 

734 if not field_def.get("subjectShape"): 

735 matching_field_def = field_def 

736 break 

737 

738 if not matching_field_def and field_definitions: 

739 matching_field_def = field_definitions[0] 

740 

741 if matching_field_def: 

742 min_count = matching_field_def.get("min", 0) 

743 max_count = matching_field_def.get("max", None) 

744 value_count = len(prop_values) 

745 

746 if value_count < min_count: 

747 value = gettext("values") if min_count > 1 else gettext("value") 

748 errors.append( 

749 gettext( 

750 "Property %(prop_uri)s requires at least %(min_count)d %(value)s", 

751 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

752 min_count=min_count, 

753 value=value, 

754 ) 

755 ) 

756 if max_count is not None and value_count > max_count: 

757 value = gettext("values") if max_count > 1 else gettext("value") 

758 errors.append( 

759 gettext( 

760 "Property %(prop_uri)s allows at most %(max_count)d %(value)s", 

761 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

762 max_count=max_count, 

763 value=value, 

764 ) 

765 ) 

766 

767 mandatory_values = matching_field_def.get("mandatory_values", []) 

768 for mandatory_value in mandatory_values: 

769 if mandatory_value not in prop_values: 

770 errors.append( 

771 gettext( 

772 "Property %(prop_uri)s requires the value %(mandatory_value)s", 

773 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

774 mandatory_value=mandatory_value, 

775 ) 

776 ) 

777 

778 for value in prop_values: 

779 if isinstance(value, dict) and "entity_type" in value: 

780 nested_errors = validate_entity_data(value) 

781 errors.extend(nested_errors) 

782 else: 

783 datatypes = matching_field_def.get("datatypes", []) 

784 if datatypes: 

785 is_valid_datatype = False 

786 for dtype in datatypes: 

787 validation_func = next( 

788 ( 

789 d[1] 

790 for d in DATATYPE_MAPPING 

791 if d[0] == URIRef(dtype) 

792 ), 

793 None, 

794 ) 

795 if validation_func and validation_func(value): 

796 is_valid_datatype = True 

797 break 

798 if not is_valid_datatype: 

799 expected_types = ", ".join( 

800 [ 

801 custom_filter.human_readable_predicate(dtype, entity_key) 

802 for dtype in datatypes 

803 ] 

804 ) 

805 errors.append( 

806 gettext( 

807 'Value "%(value)s" for property %(prop_uri)s is not of expected type %(expected_types)s', 

808 value=value, 

809 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

810 expected_types=expected_types 

811 ) 

812 ) 

813 

814 optional_values = matching_field_def.get("optionalValues", []) 

815 if optional_values and value not in optional_values: 

816 acceptable_values = ", ".join( 

817 [ 

818 custom_filter.human_readable_predicate(val, entity_key) 

819 for val in optional_values 

820 ] 

821 ) 

822 errors.append( 

823 gettext( 

824 'Value "%(value)s" is not permitted for property %(prop_uri)s. Acceptable values are: %(acceptable_values)s', 

825 value=value, 

826 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

827 acceptable_values=acceptable_values 

828 ) 

829 ) 

830 

831 # In the RDF model, a property with zero values is equivalent to the property being absent, 

832 # as a triple requires a subject, predicate, and object. Therefore, this section checks for 

833 # properties defined in the schema that are completely absent from the input data but are 

834 # required (min_count > 0). This complements the cardinality check above, which only 

835 # validates properties that are present in the data. 

836 # Check for missing required properties 

837 for prop_uri, field_definitions in entity_fields.items(): 

838 if prop_uri not in properties: 

839 for field_def in field_definitions: 

840 min_count = field_def.get("min", 0) 

841 if min_count > 0: 

842 value = gettext("values") if min_count > 1 else gettext("value") 

843 errors.append( 

844 gettext( 

845 "Missing required property: %(prop_uri)s requires at least %(min_count)d %(value)s", 

846 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

847 min_count=min_count, 

848 value=value, 

849 ) 

850 ) 

851 break # Only need to report once per property 

852 

853 return errors 

854 

855 

856@entity_bp.route("/entity-history/<path:entity_uri>") 

857@login_required 

858def entity_history(entity_uri): 

859 """ 

860 Display the history of changes for an entity. 

861 

862 Args: 

863 entity_uri: URI of the entity 

864 """ 

865 custom_filter = get_custom_filter() 

866 change_tracking_config = get_change_tracking_config() 

867 

868 agnostic_entity = AgnosticEntity( 

869 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True 

870 ) 

871 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

872 

873 sorted_metadata = sorted( 

874 provenance[entity_uri].items(), 

875 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]), 

876 ) 

877 sorted_timestamps = [ 

878 convert_to_datetime(meta["generatedAtTime"], stringify=True) 

879 for _, meta in sorted_metadata 

880 ] 

881 

882 # Get correct context for entity label 

883 latest_metadata = sorted_metadata[-1][1] if sorted_metadata else None 

884 is_latest_deletion = ( 

885 latest_metadata 

886 and "invalidatedAtTime" in latest_metadata 

887 and latest_metadata["invalidatedAtTime"] 

888 ) 

889 if is_latest_deletion and len(sorted_timestamps) > 1: 

890 context_snapshot = history[entity_uri][sorted_timestamps[-2]] 

891 else: 

892 context_snapshot = history[entity_uri][sorted_timestamps[-1]] 

893 

894 entity_classes = [str(triple[2]) for triple in context_snapshot.triples((URIRef(entity_uri), RDF.type, None))] 

895 highest_priority_class = get_highest_priority_class(entity_classes) 

896 

897 snapshot_entity_shape = determine_shape_for_entity_triples( 

898 list(context_snapshot.triples((URIRef(entity_uri), None, None))) 

899 ) 

900 

901 # Generate timeline events 

902 events = [] 

903 for i, (snapshot_uri, metadata) in enumerate(sorted_metadata): 

904 date = convert_to_datetime(metadata["generatedAtTime"]) 

905 snapshot_timestamp_str = convert_to_datetime( 

906 metadata["generatedAtTime"], stringify=True 

907 ) 

908 snapshot_graph = history[entity_uri][snapshot_timestamp_str] 

909 

910 responsible_agent = custom_filter.format_agent_reference( 

911 metadata["wasAttributedTo"] 

912 ) 

913 primary_source = custom_filter.format_source_reference( 

914 metadata["hadPrimarySource"] 

915 ) 

916 

917 description = _format_snapshot_description( 

918 metadata, 

919 entity_uri, 

920 highest_priority_class, 

921 context_snapshot, 

922 history, 

923 sorted_timestamps, 

924 i, 

925 custom_filter, 

926 ) 

927 modifications = metadata.get("hasUpdateQuery", "") 

928 modification_text = "" 

929 if modifications: 

930 parsed_modifications = parse_sparql_update(modifications) 

931 modification_text = generate_modification_text( 

932 parsed_modifications, 

933 highest_priority_class, 

934 snapshot_entity_shape, 

935 history=history, 

936 entity_uri=entity_uri, 

937 current_snapshot=snapshot_graph, 

938 current_snapshot_timestamp=snapshot_timestamp_str, 

939 custom_filter=custom_filter, 

940 ) 

941 

942 # Check if this version can be restored (not the latest version and there are multiple versions) 

943 can_restore = len(sorted_metadata) > 1 and i + 1 < len(sorted_metadata) 

944 restore_button = "" 

945 if can_restore: 

946 restore_button = f""" 

947 <form action='/restore-version/{entity_uri}/{metadata["generatedAtTime"]}' method='post' class='d-inline restore-form'> 

948 <button type='submit' class='btn btn-success restore-btn'> 

949 <i class='bi bi-arrow-counterclockwise me-1'></i>{gettext('Restore')} 

950 </button> 

951 </form> 

952 """ 

953 

954 event = { 

955 "start_date": { 

956 "year": date.year, 

957 "month": date.month, 

958 "day": date.day, 

959 "hour": date.hour, 

960 "minute": date.minute, 

961 "second": date.second, 

962 }, 

963 "text": { 

964 "headline": gettext("Snapshot") + " " + str(i + 1), 

965 "text": f""" 

966 <p><strong>{gettext('Responsible agent')}:</strong> {responsible_agent}</p> 

967 <p><strong>{gettext('Primary source')}:</strong> {primary_source}</p> 

968 <p><strong>{gettext('Description')}:</strong> {description}</p> 

969 <div class="modifications mb-3"> 

970 {modification_text} 

971 </div> 

972 <div class="d-flex gap-2 mt-2"> 

973 <a href='/entity-version/{entity_uri}/{metadata["generatedAtTime"]}' class='btn btn-outline-primary view-version' target='_self'>{gettext('View version')}</a> 

974 {restore_button} 

975 </div> 

976 """, 

977 }, 

978 "autolink": False, 

979 } 

980 

981 if i + 1 < len(sorted_metadata): 

982 next_date = convert_to_datetime( 

983 sorted_metadata[i + 1][1]["generatedAtTime"] 

984 ) 

985 event["end_date"] = { 

986 "year": next_date.year, 

987 "month": next_date.month, 

988 "day": next_date.day, 

989 "hour": next_date.hour, 

990 "minute": next_date.minute, 

991 "second": next_date.second, 

992 } 

993 

994 events.append(event) 

995 

996 entity_label = custom_filter.human_readable_entity( 

997 entity_uri, (highest_priority_class, snapshot_entity_shape), context_snapshot 

998 ) 

999 

1000 timeline_data = { 

1001 "entityUri": entity_uri, 

1002 "entityLabel": entity_label, 

1003 "entityClasses": list(entity_classes), 

1004 "entityShape": snapshot_entity_shape, 

1005 "events": events, 

1006 } 

1007 

1008 return render_template("entity/history.jinja", timeline_data=timeline_data) 

1009 

1010 

1011def _format_snapshot_description( 

1012 metadata: dict, 

1013 entity_uri: str, 

1014 highest_priority_class: str, 

1015 context_snapshot: Graph, 

1016 history: dict, 

1017 sorted_timestamps: list[str], 

1018 current_index: int, 

1019 custom_filter: Filter, 

1020) -> Tuple[str, bool]: 

1021 """ 

1022 Formats the snapshot description and determines if it's a merge snapshot. 

1023 

1024 Args: 

1025 metadata: The snapshot metadata dictionary. 

1026 entity_uri: The URI of the main entity. 

1027 highest_priority_class: The highest priority class for the entity. 

1028 context_snapshot: The graph snapshot for context. 

1029 history: The history dictionary containing snapshots. 

1030 sorted_timestamps: Sorted list of snapshot timestamps. 

1031 current_index: The index of the current snapshot in sorted_timestamps. 

1032 custom_filter: The custom filter instance for formatting. 

1033 

1034 Returns: 

1035 The formatted description string. 

1036 """ 

1037 description = metadata.get("description", "") 

1038 is_merge_snapshot = False 

1039 was_derived_from = metadata.get('wasDerivedFrom') 

1040 if isinstance(was_derived_from, list) and len(was_derived_from) > 1: 

1041 is_merge_snapshot = True 

1042 

1043 if is_merge_snapshot: 

1044 # Regex to find URI after "merged with", potentially enclosed in single quotes or none 

1045 match = re.search(r"merged with ['‘]?([^'’<>\s]+)['’]?", description) 

1046 if match: 

1047 potential_merged_uri = match.group(1) 

1048 if validators.url(potential_merged_uri): 

1049 merged_entity_uri_from_desc = potential_merged_uri 

1050 merged_entity_label = None 

1051 if current_index > 0: 

1052 previous_snapshot_timestamp = sorted_timestamps[current_index - 1] 

1053 previous_snapshot_graph = history.get(entity_uri, {}).get(previous_snapshot_timestamp) 

1054 if previous_snapshot_graph: 

1055 raw_merged_entity_classes = [ 

1056 str(o) 

1057 for s, p, o in previous_snapshot_graph.triples( 

1058 (URIRef(merged_entity_uri_from_desc), RDF.type, None) 

1059 ) 

1060 ] 

1061 highest_priority_merged_class = get_highest_priority_class( 

1062 raw_merged_entity_classes 

1063 ) if raw_merged_entity_classes else None 

1064 

1065 shape = determine_shape_for_classes(raw_merged_entity_classes) 

1066 merged_entity_label = custom_filter.human_readable_entity( 

1067 merged_entity_uri_from_desc, 

1068 (highest_priority_merged_class, shape), 

1069 previous_snapshot_graph, 

1070 ) 

1071 if ( 

1072 merged_entity_label 

1073 and merged_entity_label != merged_entity_uri_from_desc 

1074 ): 

1075 description = description.replace( 

1076 match.group(0), f"merged with '{merged_entity_label}'" 

1077 ) 

1078 

1079 shape = determine_shape_for_classes([highest_priority_class]) 

1080 entity_label_for_desc = custom_filter.human_readable_entity( 

1081 entity_uri, (highest_priority_class, shape), context_snapshot 

1082 ) 

1083 if entity_label_for_desc and entity_label_for_desc != entity_uri: 

1084 description = description.replace(f"'{entity_uri}'", f"'{entity_label_for_desc}'") 

1085 

1086 return description 

1087 

1088 

1089@entity_bp.route("/entity-version/<path:entity_uri>/<timestamp>") 

1090@login_required 

1091def entity_version(entity_uri, timestamp): 

1092 """ 

1093 Display a specific version of an entity. 

1094 

1095 Args: 

1096 entity_uri: URI of the entity 

1097 timestamp: Timestamp of the version to display 

1098 """ 

1099 custom_filter = get_custom_filter() 

1100 change_tracking_config = get_change_tracking_config() 

1101 

1102 try: 

1103 timestamp_dt = datetime.fromisoformat(timestamp) 

1104 except ValueError: 

1105 provenance_sparql = get_provenance_sparql() 

1106 query_timestamp = f""" 

1107 SELECT ?generation_time 

1108 WHERE {{ 

1109 <{entity_uri}/prov/se/{timestamp}> <http://www.w3.org/ns/prov#generatedAtTime> ?generation_time. 

1110 }} 

1111 """ 

1112 provenance_sparql.setQuery(query_timestamp) 

1113 provenance_sparql.setReturnFormat(JSON) 

1114 try: 

1115 generation_time = provenance_sparql.queryAndConvert()["results"][ 

1116 "bindings" 

1117 ][0]["generation_time"]["value"] 

1118 except IndexError: 

1119 abort(404) 

1120 timestamp = generation_time 

1121 timestamp_dt = datetime.fromisoformat(generation_time) 

1122 

1123 agnostic_entity = AgnosticEntity( 

1124 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True 

1125 ) 

1126 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

1127 main_entity_history = history.get(entity_uri, {}) 

1128 sorted_timestamps = sorted( 

1129 main_entity_history.keys(), key=lambda t: convert_to_datetime(t) 

1130 ) 

1131 

1132 if not sorted_timestamps: 

1133 abort(404) 

1134 

1135 closest_timestamp = min( 

1136 sorted_timestamps, 

1137 key=lambda t: abs( 

1138 convert_to_datetime(t).astimezone() - timestamp_dt.astimezone() 

1139 ), 

1140 ) 

1141 

1142 version = main_entity_history[closest_timestamp] 

1143 triples = list(version.triples((URIRef(entity_uri), None, None))) 

1144 

1145 entity_metadata = provenance.get(entity_uri, {}) 

1146 closest_metadata = None 

1147 min_time_diff = None 

1148 

1149 latest_timestamp = max(sorted_timestamps) 

1150 latest_metadata = None 

1151 

1152 for se_uri, meta in entity_metadata.items(): 

1153 meta_time = convert_to_datetime(meta["generatedAtTime"]) 

1154 time_diff = abs((meta_time - timestamp_dt).total_seconds()) 

1155 

1156 if closest_metadata is None or time_diff < min_time_diff: 

1157 closest_metadata = meta 

1158 min_time_diff = time_diff 

1159 

1160 if meta["generatedAtTime"] == latest_timestamp: 

1161 latest_metadata = meta 

1162 

1163 if closest_metadata is None or latest_metadata is None: 

1164 abort(404) 

1165 

1166 is_deletion_snapshot = ( 

1167 closest_timestamp == latest_timestamp 

1168 and "invalidatedAtTime" in latest_metadata 

1169 and latest_metadata["invalidatedAtTime"] 

1170 ) or len(triples) == 0 

1171 

1172 context_version = version 

1173 if is_deletion_snapshot and len(sorted_timestamps) > 1: 

1174 current_index = sorted_timestamps.index(closest_timestamp) 

1175 if current_index > 0: 

1176 context_version = main_entity_history[sorted_timestamps[current_index - 1]] 

1177 

1178 if is_deletion_snapshot and len(sorted_timestamps) > 1: 

1179 subject_classes = [ 

1180 o 

1181 for _, _, o in context_version.triples((URIRef(entity_uri), RDF.type, None)) 

1182 ] 

1183 else: 

1184 subject_classes = [ 

1185 o for _, _, o in version.triples((URIRef(entity_uri), RDF.type, None)) 

1186 ] 

1187 

1188 highest_priority_class = get_highest_priority_class(subject_classes) 

1189 

1190 entity_shape = determine_shape_for_entity_triples( 

1191 list(context_version.triples((URIRef(entity_uri), None, None))) 

1192 ) 

1193 

1194 _, _, _, _, _, valid_predicates = get_valid_predicates(triples, highest_priority_class=highest_priority_class) 

1195 

1196 grouped_triples, relevant_properties = get_grouped_triples( 

1197 entity_uri, 

1198 triples, 

1199 valid_predicates, 

1200 historical_snapshot=context_version, 

1201 highest_priority_class=highest_priority_class, 

1202 highest_priority_shape=entity_shape 

1203 ) 

1204 

1205 snapshot_times = [ 

1206 convert_to_datetime(meta["generatedAtTime"]) 

1207 for meta in entity_metadata.values() 

1208 ] 

1209 snapshot_times = sorted(set(snapshot_times)) 

1210 version_number = snapshot_times.index(timestamp_dt) + 1 

1211 

1212 next_snapshot_timestamp = None 

1213 prev_snapshot_timestamp = None 

1214 

1215 for snap_time in snapshot_times: 

1216 if snap_time > timestamp_dt: 

1217 next_snapshot_timestamp = snap_time.isoformat() 

1218 break 

1219 

1220 for snap_time in reversed(snapshot_times): 

1221 if snap_time < timestamp_dt: 

1222 prev_snapshot_timestamp = snap_time.isoformat() 

1223 break 

1224 

1225 modifications = "" 

1226 if closest_metadata.get("hasUpdateQuery"): 

1227 sparql_query = closest_metadata["hasUpdateQuery"] 

1228 parsed_modifications = parse_sparql_update(sparql_query) 

1229 modifications = generate_modification_text( 

1230 parsed_modifications, 

1231 highest_priority_class, 

1232 entity_shape, 

1233 history, 

1234 entity_uri, 

1235 context_version, 

1236 closest_timestamp, 

1237 custom_filter, 

1238 ) 

1239 

1240 try: 

1241 current_index = sorted_timestamps.index(closest_timestamp) 

1242 except ValueError: 

1243 current_index = -1 

1244 

1245 if closest_metadata.get("description"): 

1246 formatted_description = _format_snapshot_description( 

1247 closest_metadata, 

1248 entity_uri, 

1249 highest_priority_class, 

1250 context_version, 

1251 history, 

1252 sorted_timestamps, 

1253 current_index, 

1254 custom_filter, 

1255 ) 

1256 closest_metadata["description"] = formatted_description 

1257 

1258 closest_timestamp = closest_metadata["generatedAtTime"] 

1259 

1260 return render_template( 

1261 "entity/version.jinja", 

1262 subject=entity_uri, 

1263 entity_type=highest_priority_class, 

1264 entity_shape=entity_shape, 

1265 metadata={closest_timestamp: closest_metadata}, 

1266 timestamp=closest_timestamp, 

1267 next_snapshot_timestamp=next_snapshot_timestamp, 

1268 prev_snapshot_timestamp=prev_snapshot_timestamp, 

1269 modifications=modifications, 

1270 grouped_triples=grouped_triples, 

1271 version_number=version_number, 

1272 version=context_version, 

1273 ) 

1274 

1275 

1276@entity_bp.route("/restore-version/<path:entity_uri>/<timestamp>", methods=["POST"]) 

1277@login_required 

1278def restore_version(entity_uri, timestamp): 

1279 """ 

1280 Restore an entity to a previous version. 

1281 

1282 Args: 

1283 entity_uri: URI of the entity to restore 

1284 timestamp: Timestamp of the version to restore to 

1285 """ 

1286 timestamp = convert_to_datetime(timestamp, stringify=True) 

1287 change_tracking_config = get_change_tracking_config() 

1288 

1289 # Get entity history 

1290 agnostic_entity = AgnosticEntity( 

1291 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True 

1292 ) 

1293 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

1294 

1295 historical_graph = history.get(entity_uri, {}).get(timestamp) 

1296 if historical_graph is None: 

1297 abort(404) 

1298 

1299 current_graph = fetch_current_state_with_related_entities(provenance) 

1300 

1301 is_deleted = len(list(current_graph.triples((URIRef(entity_uri), None, None)))) == 0 

1302 

1303 triples_or_quads_to_delete, triples_or_quads_to_add = compute_graph_differences( 

1304 current_graph, historical_graph 

1305 ) 

1306 

1307 # Get all entities that need restoration 

1308 entities_to_restore = get_entities_to_restore( 

1309 triples_or_quads_to_delete, triples_or_quads_to_add, entity_uri 

1310 ) 

1311 

1312 # Prepare snapshot information for all entities 

1313 entity_snapshots = prepare_entity_snapshots( 

1314 entities_to_restore, provenance, timestamp 

1315 ) 

1316 

1317 # Create editor instance 

1318 editor = Editor( 

1319 get_dataset_endpoint(), 

1320 get_provenance_endpoint(), 

1321 current_app.config["COUNTER_HANDLER"], 

1322 URIRef(get_responsible_agent_uri(current_user.orcid)), 

1323 None if is_deleted else entity_snapshots[entity_uri]["source"], 

1324 current_app.config["DATASET_GENERATION_TIME"], 

1325 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

1326 ) 

1327 

1328 # Import current state into editor 

1329 if get_dataset_is_quadstore(): 

1330 for quad in current_graph.quads(): 

1331 editor.g_set.add(quad) 

1332 else: 

1333 for triple in current_graph: 

1334 editor.g_set.add(triple) 

1335 

1336 editor.preexisting_finished() 

1337 

1338 # Apply deletions 

1339 for item in triples_or_quads_to_delete: 

1340 if len(item) == 4: 

1341 editor.delete(item[0], item[1], item[2], item[3]) 

1342 else: 

1343 editor.delete(item[0], item[1], item[2]) 

1344 

1345 subject = str(item[0]) 

1346 if subject in entity_snapshots: 

1347 entity_info = entity_snapshots[subject] 

1348 if entity_info["needs_restore"]: 

1349 editor.g_set.mark_as_restored(URIRef(subject)) 

1350 editor.g_set.entity_index[URIRef(subject)]["restoration_source"] = ( 

1351 entity_info["source"] 

1352 ) 

1353 

1354 # Apply additions 

1355 for item in triples_or_quads_to_add: 

1356 if len(item) == 4: 

1357 editor.create(item[0], item[1], item[2], item[3]) 

1358 else: 

1359 editor.create(item[0], item[1], item[2]) 

1360 

1361 subject = str(item[0]) 

1362 if subject in entity_snapshots: 

1363 entity_info = entity_snapshots[subject] 

1364 if entity_info["needs_restore"]: 

1365 editor.g_set.mark_as_restored(URIRef(subject)) 

1366 editor.g_set.entity_index[URIRef(subject)]["source"] = entity_info[ 

1367 "source" 

1368 ] 

1369 

1370 # Handle main entity restoration if needed 

1371 if is_deleted and entity_uri in entity_snapshots: 

1372 editor.g_set.mark_as_restored(URIRef(entity_uri)) 

1373 source = entity_snapshots[entity_uri]["source"] 

1374 editor.g_set.entity_index[URIRef(entity_uri)]["source"] = source 

1375 

1376 try: 

1377 editor.save() 

1378 flash(gettext("Version restored successfully"), "success") 

1379 except Exception as e: 

1380 flash( 

1381 gettext( 

1382 "An error occurred while restoring the version: %(error)s", error=str(e) 

1383 ), 

1384 "error", 

1385 ) 

1386 

1387 return redirect(url_for("entity.about", subject=entity_uri)) 

1388 

1389 

1390def compute_graph_differences( 

1391 current_graph: Graph | ConjunctiveGraph, historical_graph: Graph | ConjunctiveGraph 

1392): 

1393 if get_dataset_is_quadstore(): 

1394 current_data = set(current_graph.quads()) 

1395 historical_data = set(historical_graph.quads()) 

1396 else: 

1397 current_data = set(current_graph.triples((None, None, None))) 

1398 historical_data = set(historical_graph.triples((None, None, None))) 

1399 triples_or_quads_to_delete = current_data - historical_data 

1400 triples_or_quads_to_add = historical_data - current_data 

1401 

1402 return triples_or_quads_to_delete, triples_or_quads_to_add 

1403 

1404 

1405def get_entities_to_restore( 

1406 triples_or_quads_to_delete: set, triples_or_quads_to_add: set, main_entity_uri: str 

1407) -> set: 

1408 """ 

1409 Identify all entities that need to be restored based on the graph differences. 

1410 

1411 Args: 

1412 triples_or_quads_to_delete: Set of triples/quads to be deleted 

1413 triples_or_quads_to_add: Set of triples/quads to be added 

1414 main_entity_uri: URI of the main entity being restored 

1415 

1416 Returns: 

1417 Set of entity URIs that need to be restored 

1418 """ 

1419 entities_to_restore = {main_entity_uri} 

1420 

1421 for item in list(triples_or_quads_to_delete) + list(triples_or_quads_to_add): 

1422 predicate = str(item[1]) 

1423 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": 

1424 continue 

1425 

1426 subject = str(item[0]) 

1427 obj = str(item[2]) 

1428 for uri in [subject, obj]: 

1429 if uri != main_entity_uri and validators.url(uri): 

1430 entities_to_restore.add(uri) 

1431 

1432 return entities_to_restore 

1433 

1434 

1435def prepare_entity_snapshots( 

1436 entities_to_restore: set, provenance: dict, target_time: str 

1437) -> dict: 

1438 """ 

1439 Prepare snapshot information for all entities that need to be restored. 

1440 

1441 Args: 

1442 entities_to_restore: Set of entity URIs to process 

1443 provenance: Dictionary containing provenance data for all entities 

1444 target_time: Target restoration time 

1445 

1446 Returns: 

1447 Dictionary mapping entity URIs to their restoration information 

1448 """ 

1449 entity_snapshots = {} 

1450 

1451 for entity_uri in entities_to_restore: 

1452 if entity_uri not in provenance: 

1453 continue 

1454 

1455 # Find the appropriate source snapshot 

1456 source_snapshot = find_appropriate_snapshot(provenance[entity_uri], target_time) 

1457 if not source_snapshot: 

1458 continue 

1459 

1460 # Check if entity is currently deleted by examining its latest snapshot 

1461 sorted_snapshots = sorted( 

1462 provenance[entity_uri].items(), 

1463 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]), 

1464 ) 

1465 latest_snapshot = sorted_snapshots[-1][1] 

1466 is_deleted = ( 

1467 latest_snapshot.get("invalidatedAtTime") 

1468 and latest_snapshot["generatedAtTime"] 

1469 == latest_snapshot["invalidatedAtTime"] 

1470 ) 

1471 

1472 entity_snapshots[entity_uri] = { 

1473 "source": source_snapshot, 

1474 "needs_restore": is_deleted, 

1475 } 

1476 

1477 return entity_snapshots 

1478 

1479 

1480def find_appropriate_snapshot(provenance_data: dict, target_time: str) -> Optional[str]: 

1481 """ 

1482 Find the most appropriate snapshot to use as a source for restoration. 

1483 

1484 Args: 

1485 provenance_data: Dictionary of snapshots and their metadata for an entity 

1486 target_time: The target restoration time as ISO format string 

1487 

1488 Returns: 

1489 The URI of the most appropriate snapshot, or None if no suitable snapshot is found 

1490 """ 

1491 target_datetime = convert_to_datetime(target_time) 

1492 

1493 # Convert all generation times to datetime for comparison 

1494 valid_snapshots = [] 

1495 for snapshot_uri, metadata in provenance_data.items(): 

1496 generation_time = convert_to_datetime(metadata["generatedAtTime"]) 

1497 

1498 # Skip deletion snapshots (where generation time equals invalidation time) 

1499 if ( 

1500 metadata.get("invalidatedAtTime") 

1501 and metadata["generatedAtTime"] == metadata["invalidatedAtTime"] 

1502 ): 

1503 continue 

1504 

1505 # Only consider snapshots up to our target time 

1506 if generation_time <= target_datetime: 

1507 valid_snapshots.append((generation_time, snapshot_uri)) 

1508 

1509 if not valid_snapshots: 

1510 return None 

1511 

1512 # Sort by generation time and take the most recent one 

1513 valid_snapshots.sort(key=lambda x: x[0]) 

1514 return valid_snapshots[-1][1] 

1515 

1516 

1517def determine_object_class_and_shape(object_value: str, relevant_snapshot: Graph) -> tuple[Optional[str], Optional[str]]: 

1518 """ 

1519 Determine the class and shape for an object value from a graph snapshot. 

1520  

1521 Args: 

1522 object_value: The object value (URI or literal) 

1523 relevant_snapshot: Graph snapshot to query for object information 

1524  

1525 Returns: 

1526 Tuple of (object_class, object_shape_uri) or (None, None) if not determinable 

1527 """ 

1528 if not validators.url(str(object_value)) or not relevant_snapshot: 

1529 return None, None 

1530 

1531 object_triples = list(relevant_snapshot.triples((URIRef(object_value), None, None))) 

1532 if not object_triples: 

1533 return None, None 

1534 

1535 object_shape_uri = determine_shape_for_entity_triples(object_triples) 

1536 object_classes = [ 

1537 str(o) 

1538 for _, _, o in relevant_snapshot.triples( 

1539 (URIRef(object_value), RDF.type, None) 

1540 ) 

1541 ] 

1542 object_class = get_highest_priority_class(object_classes) if object_classes else None 

1543 

1544 return object_class, object_shape_uri 

1545 

1546 

1547def generate_modification_text( 

1548 modifications, 

1549 highest_priority_class, 

1550 entity_shape, 

1551 history, 

1552 entity_uri, 

1553 current_snapshot, 

1554 current_snapshot_timestamp, 

1555 custom_filter: Filter, 

1556) -> str: 

1557 """ 

1558 Generate HTML text describing modifications to an entity, using display rules for property ordering. 

1559 

1560 Args: 

1561 modifications (dict): Dictionary of modifications from parse_sparql_update 

1562 highest_priority_class (str): The highest priority class for the subject entity 

1563 entity_shape (str): The shape for the subject entity 

1564 history (dict): Historical snapshots dictionary 

1565 entity_uri (str): URI of the entity being modified 

1566 current_snapshot (Graph): Current entity snapshot 

1567 current_snapshot_timestamp (str): Timestamp of current snapshot 

1568 custom_filter (Filter): Filter instance for formatting 

1569 

1570 Returns: 

1571 str: HTML text describing the modifications 

1572 """ 

1573 modification_text = "<p><strong>" + gettext("Modifications") + "</strong></p>" 

1574 

1575 ordered_properties = get_property_order_from_rules(highest_priority_class, entity_shape) 

1576 

1577 for mod_type, triples in modifications.items(): 

1578 modification_text += "<ul class='list-group mb-3'><p>" 

1579 if mod_type == gettext("Additions"): 

1580 modification_text += '<i class="bi bi-plus-circle-fill text-success"></i>' 

1581 elif mod_type == gettext("Deletions"): 

1582 modification_text += '<i class="bi bi-dash-circle-fill text-danger"></i>' 

1583 modification_text += " <em>" + gettext(mod_type) + "</em></p>" 

1584 

1585 object_shapes_cache = {} 

1586 object_classes_cache = {} 

1587 

1588 relevant_snapshot = None 

1589 if ( 

1590 mod_type == gettext("Deletions") 

1591 and history 

1592 and entity_uri 

1593 and current_snapshot_timestamp 

1594 ): 

1595 sorted_timestamps = sorted(history[entity_uri].keys()) 

1596 current_index = sorted_timestamps.index(current_snapshot_timestamp) 

1597 if current_index > 0: 

1598 relevant_snapshot = history[entity_uri][ 

1599 sorted_timestamps[current_index - 1] 

1600 ] 

1601 else: 

1602 relevant_snapshot = current_snapshot 

1603 

1604 if relevant_snapshot: 

1605 for triple in triples: 

1606 object_value = triple[2] 

1607 object_class, object_shape = determine_object_class_and_shape(object_value, relevant_snapshot) 

1608 object_classes_cache[str(object_value)] = object_class 

1609 object_shapes_cache[str(object_value)] = object_shape 

1610 

1611 predicate_shape_groups = {} 

1612 predicate_ordering_cache = {} 

1613 entity_position_cache = {} 

1614 

1615 for triple in triples: 

1616 predicate = str(triple[1]) 

1617 object_value = str(triple[2]) 

1618 object_shape_uri = object_shapes_cache.get(object_value) 

1619 

1620 if predicate not in predicate_ordering_cache: 

1621 predicate_ordering_cache[predicate] = get_predicate_ordering_info(predicate, highest_priority_class, entity_shape) 

1622 

1623 order_property = predicate_ordering_cache[predicate] 

1624 if order_property and validators.url(object_value) and relevant_snapshot: 

1625 position_key = (object_value, predicate) 

1626 if position_key not in entity_position_cache: 

1627 entity_position_cache[position_key] = get_entity_position_in_sequence( 

1628 object_value, entity_uri, predicate, order_property, relevant_snapshot 

1629 ) 

1630 

1631 group_key = (predicate, object_shape_uri) 

1632 if group_key not in predicate_shape_groups: 

1633 predicate_shape_groups[group_key] = [] 

1634 predicate_shape_groups[group_key].append(triple) 

1635 

1636 processed_predicates = set() 

1637 

1638 def get_cached_position(triple, predicate_uri): 

1639 object_value = str(triple[2]) 

1640 position_key = (object_value, predicate_uri) 

1641 return entity_position_cache.get(position_key, float('inf')) 

1642 

1643 for predicate in ordered_properties: 

1644 shape_order = get_shape_order_from_display_rules(highest_priority_class, entity_shape, predicate) 

1645 predicate_groups = [] 

1646 for group_key, group_triples in predicate_shape_groups.items(): 

1647 predicate_uri, object_shape_uri = group_key 

1648 if predicate_uri == predicate: 

1649 if object_shape_uri and object_shape_uri in shape_order: 

1650 shape_priority = shape_order.index(object_shape_uri) 

1651 else: 

1652 # Objects without shapes or shapes not in display rules go at the end 

1653 shape_priority = len(shape_order) 

1654 

1655 predicate_groups.append((shape_priority, group_key, group_triples)) 

1656 

1657 predicate_groups.sort(key=lambda x: x[0]) 

1658 for _, group_key, group_triples in predicate_groups: 

1659 processed_predicates.add(group_key) 

1660 

1661 predicate_uri, _ = group_key 

1662 order_property = predicate_ordering_cache.get(predicate_uri) 

1663 

1664 if order_property and relevant_snapshot: 

1665 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri)) 

1666 

1667 for triple in group_triples: 

1668 modification_text += format_triple_modification( 

1669 triple, 

1670 highest_priority_class, 

1671 entity_shape, 

1672 object_shapes_cache, 

1673 object_classes_cache, 

1674 relevant_snapshot, 

1675 custom_filter, 

1676 subject_uri=entity_uri, 

1677 predicate_ordering_cache=predicate_ordering_cache, 

1678 entity_position_cache=entity_position_cache, 

1679 ) 

1680 

1681 # Then handle any remaining predicate+shape groups not in the ordered list 

1682 for group_key, group_triples in predicate_shape_groups.items(): 

1683 if group_key not in processed_predicates: 

1684 # Sort remaining triples by their cached positions too 

1685 predicate_uri, _ = group_key 

1686 order_property = predicate_ordering_cache.get(predicate_uri) 

1687 

1688 if order_property and relevant_snapshot: 

1689 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri)) 

1690 

1691 for triple in group_triples: 

1692 modification_text += format_triple_modification( 

1693 triple, 

1694 highest_priority_class, 

1695 entity_shape, 

1696 object_shapes_cache, 

1697 object_classes_cache, 

1698 relevant_snapshot, 

1699 custom_filter, 

1700 subject_uri=entity_uri, 

1701 predicate_ordering_cache=predicate_ordering_cache, 

1702 entity_position_cache=entity_position_cache, 

1703 ) 

1704 

1705 modification_text += "</ul>" 

1706 

1707 return modification_text 

1708 

1709 

1710def format_triple_modification( 

1711 triple: Tuple[URIRef, URIRef, URIRef|Literal], 

1712 highest_priority_class: str, 

1713 entity_shape: str, 

1714 object_shapes_cache: dict, 

1715 object_classes_cache: dict, 

1716 relevant_snapshot: Optional[Graph], 

1717 custom_filter: Filter, 

1718 subject_uri: str = None, 

1719 predicate_ordering_cache: Optional[dict] = None, 

1720 entity_position_cache: Optional[dict] = None, 

1721) -> str: 

1722 """ 

1723 Format a single triple modification as HTML. 

1724 

1725 Args: 

1726 triple: The RDF triple being modified 

1727 highest_priority_class: The highest priority class for the subject entity 

1728 entity_shape: The shape for the subject entity 

1729 object_shapes_cache: Pre-computed cache of object shapes 

1730 object_classes_cache: Pre-computed cache of object classes 

1731 relevant_snapshot: Graph snapshot for context 

1732 custom_filter (Filter): Filter instance for formatting 

1733 subject_uri: URI of the subject entity (for ordering queries) 

1734 

1735 Returns: 

1736 str: HTML text describing the modification 

1737 """ 

1738 predicate = triple[1] 

1739 object_value = triple[2] 

1740 

1741 object_shape_uri = object_shapes_cache.get(str(object_value)) 

1742 

1743 predicate_label = custom_filter.human_readable_predicate( 

1744 predicate, (highest_priority_class, entity_shape), object_shape_uri=object_shape_uri 

1745 ) 

1746 

1747 object_class = object_classes_cache.get(str(object_value)) # Get from classes cache 

1748 object_label = get_object_label( 

1749 object_value, 

1750 predicate, 

1751 object_shape_uri, 

1752 object_class, 

1753 relevant_snapshot, 

1754 custom_filter, 

1755 subject_entity_key=(highest_priority_class, entity_shape), 

1756 ) 

1757 

1758 order_info = "" 

1759 if subject_uri and validators.url(str(object_value)): 

1760 if predicate_ordering_cache and entity_position_cache: 

1761 order_property = predicate_ordering_cache.get(str(predicate)) 

1762 if order_property: 

1763 position_key = (str(object_value), str(predicate)) 

1764 position = entity_position_cache.get(position_key) 

1765 if position is not None: 

1766 order_info = f' <span class="order-position-badge">#{position}</span>' 

1767 

1768 return f""" 

1769 <li class='d-flex align-items-center'> 

1770 <span class='flex-grow-1 d-flex flex-column justify-content-center ms-3 mb-2 w-100'> 

1771 <strong>{predicate_label}{order_info}</strong> 

1772 <span class="object-value word-wrap">{object_label}</span> 

1773 </span> 

1774 </li>""" 

1775 

1776 

1777def get_object_label( 

1778 object_value: str, 

1779 predicate: str, 

1780 object_shape_uri: Optional[str], 

1781 object_class: Optional[str], 

1782 snapshot: Optional[Graph], 

1783 custom_filter: Filter, 

1784 subject_entity_key: Optional[tuple] = None, 

1785) -> str: 

1786 """ 

1787 Get appropriate display label for an object value. 

1788 

1789 Args: 

1790 object_value: The value to get a label for 

1791 predicate: The predicate URI 

1792 object_shape_uri: Pre-computed shape URI for the object 

1793 object_class: Pre-computed class for the object 

1794 snapshot: Graph snapshot for context (essential for deleted triples) 

1795 custom_filter (Filter): Custom filter instance for formatting 

1796 subject_entity_key: Tuple of (class, shape) for the subject entity 

1797 

1798 Returns: 

1799 str: A human-readable label for the object value 

1800 """ 

1801 predicate = str(predicate) 

1802 

1803 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": 

1804 return custom_filter.human_readable_class(subject_entity_key) 

1805 

1806 if validators.url(object_value): 

1807 if object_shape_uri or object_class: 

1808 return custom_filter.human_readable_entity( 

1809 object_value, (object_class, object_shape_uri), snapshot 

1810 ) 

1811 else: 

1812 return str(object_value) 

1813 

1814 return str(object_value) 

1815 

1816 

1817def process_modification_data(data: dict) -> Tuple[str, List[dict]]: 

1818 """ 

1819 Process modification data to extract subjects and predicates. 

1820 

1821 Args: 

1822 data: Dictionary containing modification data 

1823 

1824 Returns: 

1825 Tuple containing subject URI and list of modification details 

1826 """ 

1827 subject_uri = data.get("subject") 

1828 if not subject_uri: 

1829 raise ValueError("No subject URI provided in modification data") 

1830 

1831 modifications = data.get("modifications", []) 

1832 if not modifications: 

1833 raise ValueError("No modifications provided in data") 

1834 

1835 return subject_uri, modifications 

1836 

1837 

1838def validate_modification( 

1839 modification: dict, subject_uri: str 

1840) -> Tuple[bool, str]: 

1841 """ 

1842 Validate a single modification operation. 

1843 

1844 Args: 

1845 modification: Dictionary containing modification details 

1846 subject_uri: URI of the subject being modified 

1847 

1848 Returns: 

1849 Tuple of (is_valid, error_message) 

1850 """ 

1851 form_fields = get_form_fields() 

1852 operation = modification.get("operation") 

1853 if not operation: 

1854 return False, "No operation specified in modification" 

1855 

1856 predicate = modification.get("predicate") 

1857 if not predicate: 

1858 return False, "No predicate specified in modification" 

1859 

1860 if operation not in ["add", "remove", "update"]: 

1861 return False, f"Invalid operation: {operation}" 

1862 

1863 if form_fields: 

1864 entity_type = modification.get("entity_type") 

1865 entity_shape = modification.get("entity_shape") 

1866 

1867 # If entity_type is not provided in modification, get it from the database 

1868 if not entity_type: 

1869 entity_types = get_entity_types(subject_uri) 

1870 if entity_types: 

1871 entity_type = get_highest_priority_class(entity_types) 

1872 

1873 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

1874 

1875 if matching_key: 

1876 predicate_fields = form_fields[matching_key].get(predicate, []) 

1877 

1878 for field in predicate_fields: 

1879 if operation == "remove" and field.get("minCount", 0) > 0: 

1880 return False, f"Cannot remove required predicate: {predicate}" 

1881 

1882 if operation == "add": 

1883 current_count = get_predicate_count(subject_uri, predicate) 

1884 max_count = field.get("maxCount") 

1885 

1886 if max_count and current_count >= max_count: 

1887 return ( 

1888 False, 

1889 f"Maximum count exceeded for predicate: {predicate}", 

1890 ) 

1891 

1892 return True, "" 

1893 

1894 

1895def get_predicate_count(subject_uri: str, predicate: str) -> int: 

1896 """ 

1897 Get the current count of values for a predicate. 

1898 

1899 Args: 

1900 subject_uri: URI of the entity 

1901 predicate: Predicate URI to count 

1902 

1903 Returns: 

1904 Number of values for the predicate 

1905 """ 

1906 sparql = get_sparql() 

1907 

1908 query = f""" 

1909 SELECT (COUNT(?o) as ?count) WHERE {{ 

1910 <{subject_uri}> <{predicate}> ?o . 

1911 }} 

1912 """ 

1913 

1914 sparql.setQuery(query) 

1915 sparql.setReturnFormat(JSON) 

1916 results = sparql.query().convert() 

1917 

1918 return int(results["results"]["bindings"][0]["count"]["value"]) 

1919 

1920 

1921def apply_modifications( 

1922 editor: Editor, 

1923 modifications: List[dict], 

1924 subject_uri: str, 

1925 graph_uri: Optional[str] = None, 

1926): 

1927 """ 

1928 Apply a list of modifications to an entity. 

1929 

1930 Args: 

1931 editor: Editor instance to use for modifications 

1932 modifications: List of modification operations 

1933 subject_uri: URI of the entity being modified 

1934 graph_uri: Optional graph URI for quad store 

1935 """ 

1936 for mod in modifications: 

1937 operation = mod["operation"] 

1938 predicate = mod["predicate"] 

1939 

1940 if operation == "remove": 

1941 editor.delete(URIRef(subject_uri), URIRef(predicate), graph_uri=graph_uri) 

1942 

1943 elif operation == "add": 

1944 value = mod["value"] 

1945 datatype = mod.get("datatype", XSD.string) 

1946 

1947 if validators.url(value): 

1948 object_value = URIRef(value) 

1949 else: 

1950 object_value = Literal(value, datatype=URIRef(datatype)) 

1951 

1952 editor.create( 

1953 URIRef(subject_uri), URIRef(predicate), object_value, graph_uri 

1954 ) 

1955 

1956 elif operation == "update": 

1957 old_value = mod["oldValue"] 

1958 new_value = mod["newValue"] 

1959 datatype = mod.get("datatype", XSD.string) 

1960 

1961 if validators.url(old_value): 

1962 old_object = URIRef(old_value) 

1963 else: 

1964 old_object = Literal(old_value, datatype=URIRef(datatype)) 

1965 

1966 if validators.url(new_value): 

1967 new_object = URIRef(new_value) 

1968 else: 

1969 new_object = Literal(new_value, datatype=URIRef(datatype)) 

1970 

1971 editor.update( 

1972 URIRef(subject_uri), 

1973 URIRef(predicate), 

1974 old_object, 

1975 new_object, 

1976 graph_uri, 

1977 )