Coverage for heritrace / routes / entity.py: 89%

783 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-21 12:56 +0000

1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import json 

6import re 

7from datetime import datetime 

8from typing import List, Optional, Tuple 

9 

10import validators 

11from flask import (Blueprint, abort, current_app, flash, jsonify, redirect, 

12 render_template, request, url_for) 

13from flask_babel import gettext 

14from flask_login import current_user, login_required 

15from rdflib import RDF, XSD, Dataset, Graph, Literal, URIRef 

16from SPARQLWrapper import JSON 

17from time_agnostic_library.agnostic_entity import AgnosticEntity 

18 

19from heritrace.apis.orcid import get_responsible_agent_uri 

20from heritrace.editor import Editor 

21from heritrace.extensions import (get_change_tracking_config, 

22 get_custom_filter, get_dataset_endpoint, 

23 get_dataset_is_quadstore, get_display_rules, 

24 get_form_fields, get_provenance_endpoint, 

25 get_provenance_sparql, get_shacl_graph, 

26 get_sparql) 

27from heritrace.forms import * 

28from heritrace.utils.converters import convert_to_datetime 

29from heritrace.utils.datatypes import DATATYPE_MAPPING, get_datatype_options 

30from heritrace.utils.display_rules_utils import ( 

31 get_class_priority, get_grouped_triples, get_highest_priority_class, 

32 get_predicate_ordering_info, get_property_order_from_rules, 

33 get_shape_order_from_display_rules, is_entity_type_visible) 

34from heritrace.utils.filters import Filter 

35from heritrace.utils.primary_source_utils import ( 

36 get_default_primary_source, save_user_default_primary_source) 

37from heritrace.utils.shacl_utils import (determine_shape_for_entity_triples, 

38 find_matching_form_field, 

39 get_entity_position_in_sequence) 

40from heritrace.utils.shacl_validation import get_valid_predicates 

41from heritrace.utils.sparql_utils import ( 

42 convert_to_rdflib_graphs, determine_shape_for_classes, 

43 fetch_current_state_with_related_entities, fetch_data_graph_for_subject, 

44 get_entity_types, get_triples_from_graph, import_referenced_entities, 

45 parse_sparql_update) 

46from heritrace.utils.uri_utils import generate_unique_uri 

47from heritrace.utils.virtual_properties import \ 

48 get_virtual_properties_for_entity, \ 

49 transform_entity_creation_with_virtual_properties, \ 

50 remove_virtual_properties_from_creation_data 

51 

52def _prepare_entity_creation_data(structured_data): 

53 """ 

54 Prepare entity creation data by removing virtual properties and extracting core fields. 

55 

56 Returns: 

57 Tuple of (cleaned_structured_data, entity_type, properties, entity_uri) 

58 """ 

59 cleaned_structured_data = remove_virtual_properties_from_creation_data(structured_data) 

60 entity_type = cleaned_structured_data.get("entity_type") 

61 properties = cleaned_structured_data.get("properties", {}) 

62 entity_uri = generate_unique_uri(entity_type) 

63 

64 return cleaned_structured_data, entity_type, properties, entity_uri 

65 

66 

67def _setup_editor_for_creation(editor, cleaned_structured_data): 

68 """ 

69 Setup editor for entity creation with referenced entities and preprocessing. 

70 """ 

71 import_referenced_entities(editor, cleaned_structured_data) 

72 editor.preexisting_finished() 

73 

74 

75def _process_virtual_properties_after_creation(editor, structured_data, entity_uri, default_graph_uri): 

76 """ 

77 Process virtual properties after main entity creation. 

78 """ 

79 virtual_entities = transform_entity_creation_with_virtual_properties(structured_data, str(entity_uri)) 

80 

81 if virtual_entities: 

82 for virtual_entity in virtual_entities: 

83 virtual_entity_uri = generate_unique_uri(virtual_entity.get("entity_type")) 

84 create_nested_entity(editor, virtual_entity_uri, virtual_entity, default_graph_uri) 

85 

86 # Save the virtual entities 

87 editor.save() 

88 

89 

90entity_bp = Blueprint("entity", __name__) 

91 

92 

93def get_deleted_entity_context_info(is_deleted: bool, sorted_timestamps: List[str], 

94 history: dict, subject: str) -> Tuple[Optional[Graph], Optional[str], Optional[str]]: 

95 """ 

96 Extract context information for deleted entities with multiple timestamps. 

97  

98 When an entity is deleted but has multiple timestamps in its history, 

99 this function retrieves the context snapshot from the second-to-last timestamp 

100 and determines the entity's highest priority class and shape. 

101  

102 Args: 

103 is_deleted: Whether the entity is deleted 

104 sorted_timestamps: List of timestamps in chronological order 

105 history: Dictionary mapping subject -> timestamp -> Graph 

106 subject: The entity URI as string 

107  

108 Returns: 

109 Tuple of (context_snapshot, highest_priority_class, entity_shape) 

110 Returns (None, None, None) if conditions are not met 

111 """ 

112 if is_deleted and len(sorted_timestamps) > 1: 

113 context_snapshot = history[subject][sorted_timestamps[-2]] 

114 

115 subject_classes = [ 

116 o 

117 for _, _, o in get_triples_from_graph( 

118 context_snapshot, (URIRef(subject), RDF.type, None) 

119 ) 

120 ] 

121 

122 highest_priority_class = get_highest_priority_class(subject_classes) 

123 entity_shape = determine_shape_for_entity_triples( 

124 list(get_triples_from_graph(context_snapshot, (URIRef(subject), None, None))) 

125 ) 

126 

127 return context_snapshot, highest_priority_class, entity_shape 

128 else: 

129 return None, None, None 

130 

131 

132@entity_bp.route("/about/<path:subject>") 

133@login_required 

134def about(subject): 

135 """ 

136 Display detailed information about an entity. 

137 

138 Args: 

139 subject: URI of the entity to display 

140 """ 

141 change_tracking_config = get_change_tracking_config() 

142 

143 default_primary_source = get_default_primary_source(current_user.orcid) 

144 

145 agnostic_entity = AgnosticEntity( 

146 res=subject, config=change_tracking_config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False 

147 ) 

148 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

149 history = convert_to_rdflib_graphs(history, get_dataset_is_quadstore()) 

150 

151 is_deleted = False 

152 context_snapshot = None 

153 subject_classes = [] 

154 highest_priority_class = None 

155 entity_shape = None 

156 

157 if history.get(subject): 

158 sorted_timestamps = sorted(history[subject].keys()) 

159 latest_metadata = next( 

160 ( 

161 meta 

162 for _, meta in provenance[subject].items() 

163 if meta["generatedAtTime"] == sorted_timestamps[-1] 

164 ), 

165 None, 

166 ) 

167 

168 is_deleted = ( 

169 latest_metadata 

170 and "invalidatedAtTime" in latest_metadata 

171 and latest_metadata["invalidatedAtTime"] 

172 ) 

173 

174 context_snapshot, highest_priority_class, entity_shape = get_deleted_entity_context_info( 

175 is_deleted, sorted_timestamps, history, subject 

176 ) 

177 

178 grouped_triples = {} 

179 can_be_added = [] 

180 can_be_deleted = [] 

181 datatypes = {} 

182 mandatory_values = {} 

183 optional_values = {} 

184 valid_predicates = [] 

185 data_graph = None 

186 

187 if not is_deleted: 

188 data_graph = fetch_data_graph_for_subject(subject) 

189 

190 # Check if entity exists - if no history and no data_graph, entity doesn't exist 

191 if not history.get(subject) and (not data_graph or len(data_graph) == 0): 

192 abort(404) 

193 

194 if data_graph: 

195 # Use helper function to handle both Graph and Dataset correctly 

196 triples = list(get_triples_from_graph(data_graph, (None, None, None))) 

197 subject_classes = [o for s, p, o in get_triples_from_graph(data_graph, (URIRef(subject), RDF.type, None))] 

198 subject_triples = list(get_triples_from_graph(data_graph, (URIRef(subject), None, None))) 

199 

200 highest_priority_class = get_highest_priority_class(subject_classes) 

201 entity_shape = determine_shape_for_entity_triples(subject_triples) 

202 

203 ( 

204 can_be_added, 

205 can_be_deleted, 

206 datatypes, 

207 mandatory_values, 

208 optional_values, 

209 valid_predicates, 

210 ) = get_valid_predicates(triples, highest_priority_class=highest_priority_class) 

211 

212 grouped_triples, relevant_properties = get_grouped_triples( 

213 subject, triples, valid_predicates, highest_priority_class=highest_priority_class, highest_priority_shape=entity_shape 

214 ) 

215 

216 virtual_properties = get_virtual_properties_for_entity(highest_priority_class, entity_shape) 

217 

218 can_be_added = [uri for uri in can_be_added if uri in relevant_properties] + [vp[0] for vp in virtual_properties] 

219 can_be_deleted = [ 

220 uri for uri in can_be_deleted if uri in relevant_properties 

221 ] + [vp[0] for vp in virtual_properties] 

222 

223 update_form = UpdateTripleForm() 

224 

225 form_fields = get_form_fields() 

226 

227 datatype_options = get_datatype_options() 

228 

229 predicate_details_map = {} 

230 for entity_type_key, predicates in form_fields.items(): 

231 for predicate_uri, details_list in predicates.items(): 

232 for details in details_list: 

233 shape = details.get("nodeShape") 

234 key = (predicate_uri, entity_type_key, shape) 

235 predicate_details_map[key] = details 

236 

237 return render_template( 

238 "entity/about.jinja", 

239 subject=subject, 

240 history=history, 

241 can_be_added=can_be_added, 

242 can_be_deleted=can_be_deleted, 

243 datatypes=datatypes, 

244 update_form=update_form, 

245 mandatory_values=mandatory_values, 

246 optional_values=optional_values, 

247 shacl=bool(len(get_shacl_graph())), 

248 grouped_triples=grouped_triples, 

249 display_rules=get_display_rules(), 

250 form_fields=form_fields, 

251 entity_type=highest_priority_class, 

252 entity_shape=entity_shape, 

253 predicate_details_map=predicate_details_map, 

254 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"], 

255 dataset_db_text_index_enabled=current_app.config[ 

256 "DATASET_DB_TEXT_INDEX_ENABLED" 

257 ], 

258 is_deleted=is_deleted, 

259 context=context_snapshot, 

260 default_primary_source=default_primary_source, 

261 datatype_options=datatype_options, 

262 ) 

263 

264 

265@entity_bp.route("/create-entity", methods=["GET", "POST"]) 

266@login_required 

267def create_entity(): 

268 """ 

269 Create a new entity in the dataset. 

270 """ 

271 form_fields = get_form_fields() 

272 

273 default_primary_source = get_default_primary_source(current_user.orcid) 

274 

275 entity_class_shape_pairs = sorted( 

276 [ 

277 entity_key 

278 for entity_key in form_fields.keys() 

279 if is_entity_type_visible(entity_key) 

280 ], 

281 key=lambda et: get_class_priority(et), 

282 reverse=True, 

283 ) 

284 

285 datatype_options = get_datatype_options() 

286 

287 if request.method == "POST": 

288 structured_data = json.loads(request.form.get("structured_data", "{}")) 

289 primary_source = request.form.get("primary_source") or None 

290 save_default_source = request.form.get("save_default_source") == 'true' 

291 

292 if primary_source and not validators.url(primary_source): 

293 return jsonify({"status": "error", "errors": [gettext("Invalid primary source URL provided")]}), 400 

294 

295 if save_default_source and primary_source and validators.url(primary_source): 

296 save_user_default_primary_source(current_user.orcid, primary_source) 

297 

298 editor = Editor( 

299 get_dataset_endpoint(), 

300 get_provenance_endpoint(), 

301 current_app.config["COUNTER_HANDLER"], 

302 URIRef(get_responsible_agent_uri(current_user.orcid)), 

303 primary_source, 

304 current_app.config["DATASET_GENERATION_TIME"], 

305 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

306 ) 

307 

308 if not structured_data.get("entity_type"): 

309 return jsonify({"status": "error", "errors": [gettext("Entity type is required")]}), 400 

310 

311 # Prepare common data for entity creation 

312 cleaned_structured_data, entity_type, properties, entity_uri = _prepare_entity_creation_data(structured_data) 

313 

314 default_graph_uri = ( 

315 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None 

316 ) 

317 

318 if form_fields: 

319 validation_errors = validate_entity_data(cleaned_structured_data) 

320 if validation_errors: 

321 return jsonify({"status": "error", "errors": validation_errors}), 400 

322 

323 _setup_editor_for_creation(editor, cleaned_structured_data) 

324 

325 for predicate, values in properties.items(): 

326 if not isinstance(values, list): 

327 values = [values] 

328 

329 entity_shape = cleaned_structured_data.get("entity_shape") 

330 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

331 

332 field_definitions = form_fields.get(matching_key, {}).get(predicate, []) if matching_key else [] 

333 

334 # Get the shape from the property value if available 

335 property_shape = None 

336 if values and isinstance(values[0], dict): 

337 property_shape = values[0].get("shape") 

338 

339 # Filter field definitions to find the matching one based on shape 

340 matching_field_def = None 

341 for field_def in field_definitions: 

342 if property_shape: 

343 # If property has a shape, match it with the field definition's subjectShape 

344 if field_def.get("subjectShape") == property_shape: 

345 matching_field_def = field_def 

346 break 

347 else: 

348 # If no shape specified, use the first field definition without a shape requirement 

349 if not field_def.get("subjectShape"): 

350 matching_field_def = field_def 

351 break 

352 

353 # If no matching field definition found, use the first one (default behavior) 

354 if not matching_field_def and field_definitions: 

355 matching_field_def = field_definitions[0] 

356 

357 ordered_by = ( 

358 matching_field_def.get("orderedBy") if matching_field_def else None 

359 ) 

360 

361 if ordered_by: 

362 process_ordered_properties( 

363 editor, entity_uri, predicate, values, default_graph_uri, ordered_by 

364 ) 

365 else: 

366 # Handle unordered properties 

367 process_unordered_properties( 

368 editor, entity_uri, predicate, values, default_graph_uri, matching_field_def 

369 ) 

370 else: 

371 editor.import_entity(entity_uri) 

372 _setup_editor_for_creation(editor, cleaned_structured_data) 

373 

374 editor.create( 

375 entity_uri, 

376 RDF.type, 

377 URIRef(entity_type), 

378 default_graph_uri, 

379 ) 

380 

381 for predicate, values in properties.items(): 

382 for value_dict in values: 

383 if value_dict["type"] == "uri": 

384 editor.create( 

385 entity_uri, 

386 URIRef(predicate), 

387 URIRef(value_dict["value"]), 

388 default_graph_uri, 

389 ) 

390 elif value_dict["type"] == "literal": 

391 datatype = ( 

392 URIRef(value_dict["datatype"]) 

393 if "datatype" in value_dict 

394 else XSD.string 

395 ) 

396 editor.create( 

397 entity_uri, 

398 URIRef(predicate), 

399 Literal(value_dict["value"], datatype=datatype), 

400 default_graph_uri, 

401 ) 

402 

403 try: 

404 # Save the main entity first 

405 editor.save() 

406 

407 # Process virtual properties after creation 

408 _process_virtual_properties_after_creation(editor, structured_data, entity_uri, default_graph_uri) 

409 

410 response = jsonify( 

411 { 

412 "status": "success", 

413 "redirect_url": url_for("entity.about", subject=str(entity_uri)), 

414 } 

415 ) 

416 flash(gettext("Entity created successfully"), "success") 

417 return response, 200 

418 except Exception as e: 

419 error_message = gettext( 

420 "An error occurred while creating the entity: %(error)s", error=str(e) 

421 ) 

422 return jsonify({"status": "error", "errors": [error_message]}), 500 

423 

424 return render_template( 

425 "create_entity.jinja", 

426 datatype_options=datatype_options, 

427 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"], 

428 dataset_db_text_index_enabled=current_app.config[ 

429 "DATASET_DB_TEXT_INDEX_ENABLED" 

430 ], 

431 default_primary_source=default_primary_source, 

432 shacl=bool(form_fields), 

433 entity_class_shape_pairs=entity_class_shape_pairs 

434 ) 

435 

436 

437def create_nested_entity( 

438 editor: Editor, entity_uri, entity_data, graph_uri=None 

439): 

440 form_fields = get_form_fields() 

441 

442 editor.create( 

443 entity_uri, 

444 URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), 

445 URIRef(entity_data["entity_type"]), 

446 graph_uri, 

447 ) 

448 

449 entity_type = entity_data.get("entity_type") 

450 entity_shape = entity_data.get("entity_shape") 

451 properties = entity_data.get("properties", {}) 

452 

453 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

454 

455 if not matching_key: 

456 return 

457 

458 # Add other properties 

459 for predicate, values in properties.items(): 

460 if not isinstance(values, list): 

461 values = [values] 

462 field_definitions = form_fields[matching_key].get(predicate, []) 

463 

464 for value in values: 

465 if isinstance(value, dict) and "entity_type" in value: 

466 if "intermediateRelation" in value: 

467 intermediate_uri = generate_unique_uri( 

468 value["intermediateRelation"]["class"] 

469 ) 

470 target_uri = generate_unique_uri(value["entity_type"]) 

471 editor.create( 

472 entity_uri, URIRef(predicate), intermediate_uri, graph_uri 

473 ) 

474 editor.create( 

475 intermediate_uri, 

476 URIRef(value["intermediateRelation"]["property"]), 

477 target_uri, 

478 graph_uri, 

479 ) 

480 create_nested_entity( 

481 editor, target_uri, value, graph_uri 

482 ) 

483 else: 

484 # Handle nested entities 

485 nested_uri = generate_unique_uri(value["entity_type"]) 

486 editor.create(entity_uri, URIRef(predicate), nested_uri, graph_uri) 

487 create_nested_entity( 

488 editor, nested_uri, value, graph_uri 

489 ) 

490 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

491 existing_entity_uri = value.get("entity_uri") 

492 if existing_entity_uri: 

493 editor.create(entity_uri, URIRef(predicate), URIRef(existing_entity_uri), graph_uri) 

494 else: 

495 # Handle simple properties - check if it's a URI or literal 

496 if validators.url(str(value)): 

497 object_value = URIRef(value) 

498 else: 

499 datatype = XSD.string # Default to string if not specified 

500 datatype_uris = [] 

501 if field_definitions: 

502 datatype_uris = field_definitions[0].get("datatypes", []) 

503 datatype = determine_datatype(value, datatype_uris) 

504 object_value = Literal(value, datatype=datatype) 

505 editor.create(entity_uri, URIRef(predicate), object_value, graph_uri) 

506 

507 

508def process_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri, matching_field_def): 

509 """ 

510 Process a single entity value, handling nested entities, existing entity references, and simple literals. 

511  

512 Args: 

513 editor: Editor instance for RDF operations 

514 entity_uri: URI of the parent entity 

515 predicate: Predicate URI 

516 value: Value to process (dict or primitive) 

517 default_graph_uri: Default graph URI for quad stores 

518 matching_field_def: Field definition for datatype validation 

519  

520 Returns: 

521 URIRef: The URI of the created/referenced entity 

522 """ 

523 if isinstance(value, dict) and "entity_type" in value: 

524 nested_uri = generate_unique_uri(value["entity_type"]) 

525 editor.create( 

526 entity_uri, 

527 URIRef(predicate), 

528 nested_uri, 

529 default_graph_uri, 

530 ) 

531 create_nested_entity( 

532 editor, 

533 nested_uri, 

534 value, 

535 default_graph_uri 

536 ) 

537 return nested_uri 

538 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

539 entity_ref_uri = value.get("entity_uri") 

540 if entity_ref_uri: 

541 object_value = URIRef(entity_ref_uri) 

542 editor.create( 

543 entity_uri, 

544 URIRef(predicate), 

545 object_value, 

546 default_graph_uri, 

547 ) 

548 return object_value 

549 else: 

550 raise ValueError("Missing entity_uri in existing entity reference") 

551 else: 

552 # Handle simple properties - check if it's a URI or literal 

553 if validators.url(str(value)): 

554 object_value = URIRef(value) 

555 else: 

556 datatype_uris = [] 

557 if matching_field_def: 

558 datatype_uris = matching_field_def.get("datatypes", []) 

559 datatype = determine_datatype(value, datatype_uris) 

560 object_value = Literal(value, datatype=datatype) 

561 editor.create( 

562 entity_uri, 

563 URIRef(predicate), 

564 object_value, 

565 default_graph_uri, 

566 ) 

567 return object_value 

568 

569 

570def process_ordered_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri): 

571 """ 

572 Process a single entity value for ordered properties. 

573  

574 Args: 

575 editor: Editor instance for RDF operations 

576 entity_uri: URI of the parent entity 

577 predicate: Predicate URI 

578 value: Value to process (dict) 

579 default_graph_uri: Default graph URI for quad stores 

580  

581 Returns: 

582 URIRef: The URI of the created/referenced entity 

583 """ 

584 if isinstance(value, dict) and "entity_type" in value: 

585 nested_uri = generate_unique_uri(value["entity_type"]) 

586 editor.create( 

587 entity_uri, 

588 URIRef(predicate), 

589 nested_uri, 

590 default_graph_uri, 

591 ) 

592 create_nested_entity( 

593 editor, 

594 nested_uri, 

595 value, 

596 default_graph_uri 

597 ) 

598 return nested_uri 

599 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

600 # If it's a direct URI value (reference to existing entity) 

601 nested_uri = URIRef(value) 

602 editor.create( 

603 entity_uri, 

604 URIRef(predicate), 

605 nested_uri, 

606 default_graph_uri, 

607 ) 

608 return nested_uri 

609 else: 

610 raise ValueError("Unexpected value type for ordered property") 

611 

612 

613def process_ordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, ordered_by): 

614 """ 

615 Process ordered properties by grouping values by shape and maintaining order. 

616  

617 Args: 

618 editor: Editor instance for RDF operations 

619 entity_uri: URI of the parent entity 

620 predicate: Predicate URI 

621 values: List of values to process 

622 default_graph_uri: Default graph URI for quad stores 

623 ordered_by: URI of the ordering property 

624 """ 

625 values_by_shape = {} 

626 for value in values: 

627 shape = value.get("entity_shape") 

628 if not shape: 

629 shape = "default_shape" 

630 if shape not in values_by_shape: 

631 values_by_shape[shape] = [] 

632 values_by_shape[shape].append(value) 

633 

634 for shape, shape_values in values_by_shape.items(): 

635 previous_entity = None 

636 for value in shape_values: 

637 nested_uri = process_ordered_entity_value( 

638 editor, entity_uri, predicate, value, default_graph_uri 

639 ) 

640 

641 if previous_entity: 

642 editor.create( 

643 previous_entity, 

644 URIRef(ordered_by), 

645 nested_uri, 

646 default_graph_uri, 

647 ) 

648 previous_entity = nested_uri 

649 

650 

651def process_unordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, matching_field_def): 

652 """ 

653 Process unordered properties. 

654  

655 Args: 

656 editor: Editor instance for RDF operations 

657 entity_uri: URI of the parent entity 

658 predicate: Predicate URI 

659 values: List of values to process 

660 default_graph_uri: Default graph URI for quad stores 

661 matching_field_def: Field definition for datatype validation 

662 """ 

663 for value in values: 

664 process_entity_value( 

665 editor, entity_uri, predicate, value, default_graph_uri, matching_field_def 

666 ) 

667 

668 

669def determine_datatype(value, datatype_uris): 

670 for datatype_uri in datatype_uris: 

671 validation_func = next( 

672 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None 

673 ) 

674 if validation_func and validation_func(value): 

675 return URIRef(datatype_uri) 

676 # If none match, default to XSD.string 

677 return XSD.string 

678 

679 

680def validate_entity_data(structured_data): 

681 """ 

682 Validates entity data against form field definitions, considering shape matching. 

683 

684 Args: 

685 structured_data (dict): Data to validate containing entity_type and properties 

686 

687 Returns: 

688 list: List of validation error messages, empty if validation passes 

689 """ 

690 custom_filter = get_custom_filter() 

691 form_fields = get_form_fields() 

692 

693 errors = [] 

694 entity_type = structured_data.get("entity_type") 

695 entity_shape = structured_data.get("entity_shape") 

696 

697 if not entity_type: 

698 errors.append(gettext("Entity type is required")) 

699 return errors 

700 

701 entity_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

702 

703 if not entity_key: 

704 errors.append(f"No form fields found for entity type: {entity_type}" + 

705 (f" and shape: {entity_shape}" if entity_shape else "")) 

706 return errors 

707 

708 entity_fields = form_fields[entity_key] 

709 properties = structured_data.get("properties", {}) 

710 

711 for prop_uri, prop_values in properties.items(): 

712 if URIRef(prop_uri) == RDF.type: 

713 continue 

714 

715 field_definitions = entity_fields.get(prop_uri) 

716 if not field_definitions: 

717 errors.append( 

718 gettext( 

719 "Unknown property %(prop_uri)s for entity type %(entity_type)s", 

720 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

721 entity_type=custom_filter.human_readable_class(entity_key), 

722 ) 

723 ) 

724 continue 

725 

726 if not isinstance(prop_values, list): 

727 prop_values = [prop_values] 

728 

729 property_shape = None 

730 if prop_values and isinstance(prop_values[0], dict): 

731 property_shape = prop_values[0].get("shape") 

732 

733 matching_field_def = None 

734 for field_def in field_definitions: 

735 if property_shape: 

736 if field_def.get("subjectShape") == property_shape: 

737 matching_field_def = field_def 

738 break 

739 else: 

740 if not field_def.get("subjectShape"): 

741 matching_field_def = field_def 

742 break 

743 

744 if not matching_field_def and field_definitions: 

745 matching_field_def = field_definitions[0] 

746 

747 if matching_field_def: 

748 min_count = matching_field_def.get("min", 0) 

749 max_count = matching_field_def.get("max", None) 

750 value_count = len(prop_values) 

751 

752 if value_count < min_count: 

753 value = gettext("values") if min_count > 1 else gettext("value") 

754 errors.append( 

755 gettext( 

756 "Property %(prop_uri)s requires at least %(min_count)d %(value)s", 

757 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

758 min_count=min_count, 

759 value=value, 

760 ) 

761 ) 

762 if max_count is not None and value_count > max_count: 

763 value = gettext("values") if max_count > 1 else gettext("value") 

764 errors.append( 

765 gettext( 

766 "Property %(prop_uri)s allows at most %(max_count)d %(value)s", 

767 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

768 max_count=max_count, 

769 value=value, 

770 ) 

771 ) 

772 

773 mandatory_values = matching_field_def.get("mandatory_values", []) 

774 for mandatory_value in mandatory_values: 

775 if mandatory_value not in prop_values: 

776 errors.append( 

777 gettext( 

778 "Property %(prop_uri)s requires the value %(mandatory_value)s", 

779 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

780 mandatory_value=mandatory_value, 

781 ) 

782 ) 

783 

784 for value in prop_values: 

785 if isinstance(value, dict) and "entity_type" in value: 

786 nested_errors = validate_entity_data(value) 

787 errors.extend(nested_errors) 

788 else: 

789 datatypes = matching_field_def.get("datatypes", []) 

790 if datatypes: 

791 is_valid_datatype = False 

792 for dtype in datatypes: 

793 validation_func = next( 

794 ( 

795 d[1] 

796 for d in DATATYPE_MAPPING 

797 if d[0] == URIRef(dtype) 

798 ), 

799 None, 

800 ) 

801 if validation_func and validation_func(value): 

802 is_valid_datatype = True 

803 break 

804 if not is_valid_datatype: 

805 expected_types = ", ".join( 

806 [ 

807 custom_filter.human_readable_predicate(dtype, entity_key) 

808 for dtype in datatypes 

809 ] 

810 ) 

811 errors.append( 

812 gettext( 

813 'Value "%(value)s" for property %(prop_uri)s is not of expected type %(expected_types)s', 

814 value=value, 

815 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

816 expected_types=expected_types 

817 ) 

818 ) 

819 

820 optional_values = matching_field_def.get("optionalValues", []) 

821 if optional_values and value not in optional_values: 

822 acceptable_values = ", ".join( 

823 [ 

824 custom_filter.human_readable_predicate(val, entity_key) 

825 for val in optional_values 

826 ] 

827 ) 

828 errors.append( 

829 gettext( 

830 'Value "%(value)s" is not permitted for property %(prop_uri)s. Acceptable values are: %(acceptable_values)s', 

831 value=value, 

832 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

833 acceptable_values=acceptable_values 

834 ) 

835 ) 

836 

837 # In the RDF model, a property with zero values is equivalent to the property being absent, 

838 # as a triple requires a subject, predicate, and object. Therefore, this section checks for 

839 # properties defined in the schema that are completely absent from the input data but are 

840 # required (min_count > 0). This complements the cardinality check above, which only 

841 # validates properties that are present in the data. 

842 # Check for missing required properties 

843 for prop_uri, field_definitions in entity_fields.items(): 

844 if prop_uri not in properties: 

845 for field_def in field_definitions: 

846 min_count = field_def.get("min", 0) 

847 if min_count > 0: 

848 value = gettext("values") if min_count > 1 else gettext("value") 

849 errors.append( 

850 gettext( 

851 "Missing required property: %(prop_uri)s requires at least %(min_count)d %(value)s", 

852 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

853 min_count=min_count, 

854 value=value, 

855 ) 

856 ) 

857 break # Only need to report once per property 

858 

859 return errors 

860 

861 

862@entity_bp.route("/entity-history/<path:entity_uri>") 

863@login_required 

864def entity_history(entity_uri): 

865 """ 

866 Display the history of changes for an entity. 

867 

868 Args: 

869 entity_uri: URI of the entity 

870 """ 

871 custom_filter = get_custom_filter() 

872 change_tracking_config = get_change_tracking_config() 

873 

874 agnostic_entity = AgnosticEntity( 

875 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True 

876 ) 

877 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

878 history = convert_to_rdflib_graphs(history, get_dataset_is_quadstore()) 

879 

880 sorted_metadata = sorted( 

881 provenance[entity_uri].items(), 

882 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]), 

883 ) 

884 sorted_timestamps = [ 

885 convert_to_datetime(meta["generatedAtTime"], stringify=True) 

886 for _, meta in sorted_metadata 

887 ] 

888 

889 # Get correct context for entity label 

890 latest_metadata = sorted_metadata[-1][1] if sorted_metadata else None 

891 is_latest_deletion = ( 

892 latest_metadata 

893 and "invalidatedAtTime" in latest_metadata 

894 and latest_metadata["invalidatedAtTime"] 

895 ) 

896 if is_latest_deletion and len(sorted_timestamps) > 1: 

897 context_snapshot = history[entity_uri][sorted_timestamps[-2]] 

898 else: 

899 context_snapshot = history[entity_uri][sorted_timestamps[-1]] 

900 

901 entity_classes = [str(triple[2]) for triple in get_triples_from_graph(context_snapshot, (URIRef(entity_uri), RDF.type, None))] 

902 highest_priority_class = get_highest_priority_class(entity_classes) 

903 

904 snapshot_entity_shape = determine_shape_for_entity_triples( 

905 list(get_triples_from_graph(context_snapshot, (URIRef(entity_uri), None, None))) 

906 ) 

907 

908 # Generate timeline events 

909 events = [] 

910 for i, (snapshot_uri, metadata) in enumerate(sorted_metadata): 

911 date = convert_to_datetime(metadata["generatedAtTime"]) 

912 snapshot_timestamp_str = convert_to_datetime( 

913 metadata["generatedAtTime"], stringify=True 

914 ) 

915 snapshot_graph = history[entity_uri][snapshot_timestamp_str] 

916 

917 responsible_agent = custom_filter.format_agent_reference( 

918 metadata["wasAttributedTo"] 

919 ) 

920 primary_source = custom_filter.format_source_reference( 

921 metadata["hadPrimarySource"] 

922 ) 

923 

924 description = _format_snapshot_description( 

925 metadata, 

926 entity_uri, 

927 highest_priority_class, 

928 context_snapshot, 

929 history, 

930 sorted_timestamps, 

931 i, 

932 custom_filter, 

933 ) 

934 modifications = metadata.get("hasUpdateQuery", "") 

935 modification_text = "" 

936 if modifications: 

937 parsed_modifications = parse_sparql_update(modifications) 

938 modification_text = generate_modification_text( 

939 parsed_modifications, 

940 highest_priority_class, 

941 snapshot_entity_shape, 

942 history=history, 

943 entity_uri=entity_uri, 

944 current_snapshot=snapshot_graph, 

945 current_snapshot_timestamp=snapshot_timestamp_str, 

946 custom_filter=custom_filter, 

947 ) 

948 

949 # Check if this version can be restored (not the latest version and there are multiple versions) 

950 can_restore = len(sorted_metadata) > 1 and i + 1 < len(sorted_metadata) 

951 restore_button = "" 

952 if can_restore: 

953 restore_button = f""" 

954 <form action='/restore-version/{entity_uri}/{metadata["generatedAtTime"]}' method='post' class='d-inline restore-form'> 

955 <button type='submit' class='btn btn-success restore-btn'> 

956 <i class='bi bi-arrow-counterclockwise me-1'></i>{gettext('Restore')} 

957 </button> 

958 </form> 

959 """ 

960 

961 event = { 

962 "start_date": { 

963 "year": date.year, 

964 "month": date.month, 

965 "day": date.day, 

966 "hour": date.hour, 

967 "minute": date.minute, 

968 "second": date.second, 

969 }, 

970 "text": { 

971 "headline": gettext("Snapshot") + " " + str(i + 1), 

972 "text": f""" 

973 <p><strong>{gettext('Responsible agent')}:</strong> {responsible_agent}</p> 

974 <p><strong>{gettext('Primary source')}:</strong> {primary_source}</p> 

975 <p><strong>{gettext('Description')}:</strong> {description}</p> 

976 <div class="modifications mb-3"> 

977 {modification_text} 

978 </div> 

979 <div class="d-flex gap-2 mt-2"> 

980 <a href='/entity-version/{entity_uri}/{metadata["generatedAtTime"]}' class='btn btn-outline-primary view-version' target='_self'>{gettext('View version')}</a> 

981 {restore_button} 

982 </div> 

983 """, 

984 }, 

985 "autolink": False, 

986 } 

987 

988 if i + 1 < len(sorted_metadata): 

989 next_date = convert_to_datetime( 

990 sorted_metadata[i + 1][1]["generatedAtTime"] 

991 ) 

992 event["end_date"] = { 

993 "year": next_date.year, 

994 "month": next_date.month, 

995 "day": next_date.day, 

996 "hour": next_date.hour, 

997 "minute": next_date.minute, 

998 "second": next_date.second, 

999 } 

1000 

1001 events.append(event) 

1002 

1003 entity_label = custom_filter.human_readable_entity( 

1004 entity_uri, (highest_priority_class, snapshot_entity_shape), context_snapshot 

1005 ) 

1006 

1007 timeline_data = { 

1008 "entityUri": entity_uri, 

1009 "entityLabel": entity_label, 

1010 "entityClasses": list(entity_classes), 

1011 "entityShape": snapshot_entity_shape, 

1012 "events": events, 

1013 } 

1014 

1015 return render_template("entity/history.jinja", timeline_data=timeline_data) 

1016 

1017 

1018def _format_snapshot_description( 

1019 metadata: dict, 

1020 entity_uri: str, 

1021 highest_priority_class: str, 

1022 context_snapshot: Graph, 

1023 history: dict, 

1024 sorted_timestamps: list[str], 

1025 current_index: int, 

1026 custom_filter: Filter, 

1027) -> Tuple[str, bool]: 

1028 """ 

1029 Formats the snapshot description and determines if it's a merge snapshot. 

1030 

1031 Args: 

1032 metadata: The snapshot metadata dictionary. 

1033 entity_uri: The URI of the main entity. 

1034 highest_priority_class: The highest priority class for the entity. 

1035 context_snapshot: The graph snapshot for context. 

1036 history: The history dictionary containing snapshots. 

1037 sorted_timestamps: Sorted list of snapshot timestamps. 

1038 current_index: The index of the current snapshot in sorted_timestamps. 

1039 custom_filter: The custom filter instance for formatting. 

1040 

1041 Returns: 

1042 The formatted description string. 

1043 """ 

1044 description = metadata.get("description", "") 

1045 is_merge_snapshot = False 

1046 was_derived_from = metadata.get('wasDerivedFrom') 

1047 if isinstance(was_derived_from, list) and len(was_derived_from) > 1: 

1048 is_merge_snapshot = True 

1049 

1050 if is_merge_snapshot: 

1051 # Regex to find URI after "merged with", potentially enclosed in single quotes or none 

1052 match = re.search(r"merged with ['‘]?([^'’<>\s]+)['’]?", description) 

1053 if match: 

1054 potential_merged_uri = match.group(1) 

1055 if validators.url(potential_merged_uri): 

1056 merged_entity_uri_from_desc = potential_merged_uri 

1057 merged_entity_label = None 

1058 if current_index > 0: 

1059 previous_snapshot_timestamp = sorted_timestamps[current_index - 1] 

1060 previous_snapshot_graph = history.get(entity_uri, {}).get(previous_snapshot_timestamp) 

1061 if previous_snapshot_graph: 

1062 raw_merged_entity_classes = [ 

1063 str(o) 

1064 for s, p, o in get_triples_from_graph( 

1065 previous_snapshot_graph, (URIRef(merged_entity_uri_from_desc), RDF.type, None) 

1066 ) 

1067 ] 

1068 highest_priority_merged_class = get_highest_priority_class( 

1069 raw_merged_entity_classes 

1070 ) if raw_merged_entity_classes else None 

1071 

1072 shape = determine_shape_for_classes(raw_merged_entity_classes) 

1073 merged_entity_label = custom_filter.human_readable_entity( 

1074 merged_entity_uri_from_desc, 

1075 (highest_priority_merged_class, shape), 

1076 previous_snapshot_graph, 

1077 ) 

1078 if ( 

1079 merged_entity_label 

1080 and merged_entity_label != merged_entity_uri_from_desc 

1081 ): 

1082 description = description.replace( 

1083 match.group(0), f"merged with '{merged_entity_label}'" 

1084 ) 

1085 

1086 shape = determine_shape_for_classes([highest_priority_class]) 

1087 entity_label_for_desc = custom_filter.human_readable_entity( 

1088 entity_uri, (highest_priority_class, shape), context_snapshot 

1089 ) 

1090 if entity_label_for_desc and entity_label_for_desc != entity_uri: 

1091 description = description.replace(f"'{entity_uri}'", f"'{entity_label_for_desc}'") 

1092 

1093 return description 

1094 

1095 

1096@entity_bp.route("/entity-version/<path:entity_uri>/<timestamp>") 

1097@login_required 

1098def entity_version(entity_uri, timestamp): 

1099 """ 

1100 Display a specific version of an entity. 

1101 

1102 Args: 

1103 entity_uri: URI of the entity 

1104 timestamp: Timestamp of the version to display 

1105 """ 

1106 custom_filter = get_custom_filter() 

1107 change_tracking_config = get_change_tracking_config() 

1108 

1109 try: 

1110 timestamp_dt = datetime.fromisoformat(timestamp) 

1111 except ValueError: 

1112 provenance_sparql = get_provenance_sparql() 

1113 query_timestamp = f""" 

1114 SELECT ?generation_time 

1115 WHERE {{ 

1116 <{entity_uri}/prov/se/{timestamp}> <http://www.w3.org/ns/prov#generatedAtTime> ?generation_time. 

1117 }} 

1118 """ 

1119 provenance_sparql.setQuery(query_timestamp) 

1120 provenance_sparql.setReturnFormat(JSON) 

1121 try: 

1122 generation_time = provenance_sparql.queryAndConvert()["results"][ 

1123 "bindings" 

1124 ][0]["generation_time"]["value"] 

1125 except IndexError: 

1126 abort(404) 

1127 timestamp = generation_time 

1128 timestamp_dt = datetime.fromisoformat(generation_time) 

1129 

1130 agnostic_entity = AgnosticEntity( 

1131 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True 

1132 ) 

1133 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

1134 history = convert_to_rdflib_graphs(history, get_dataset_is_quadstore()) 

1135 main_entity_history = history.get(entity_uri, {}) 

1136 sorted_timestamps = sorted( 

1137 main_entity_history.keys(), key=lambda t: convert_to_datetime(t) 

1138 ) 

1139 

1140 if not sorted_timestamps: 

1141 abort(404) 

1142 

1143 closest_timestamp = min( 

1144 sorted_timestamps, 

1145 key=lambda t: abs( 

1146 convert_to_datetime(t).astimezone() - timestamp_dt.astimezone() 

1147 ), 

1148 ) 

1149 

1150 version = main_entity_history[closest_timestamp] 

1151 triples = list(get_triples_from_graph(version, (URIRef(entity_uri), None, None))) 

1152 

1153 entity_metadata = provenance.get(entity_uri, {}) 

1154 closest_metadata = None 

1155 min_time_diff = None 

1156 

1157 latest_timestamp = max(sorted_timestamps) 

1158 latest_metadata = None 

1159 

1160 for se_uri, meta in entity_metadata.items(): 

1161 meta_time = convert_to_datetime(meta["generatedAtTime"]) 

1162 time_diff = abs((meta_time - timestamp_dt).total_seconds()) 

1163 

1164 if closest_metadata is None or time_diff < min_time_diff: 

1165 closest_metadata = meta 

1166 min_time_diff = time_diff 

1167 

1168 if meta["generatedAtTime"] == latest_timestamp: 

1169 latest_metadata = meta 

1170 

1171 if closest_metadata is None or latest_metadata is None: 

1172 abort(404) 

1173 

1174 is_deletion_snapshot = ( 

1175 closest_timestamp == latest_timestamp 

1176 and "invalidatedAtTime" in latest_metadata 

1177 and latest_metadata["invalidatedAtTime"] 

1178 ) or len(triples) == 0 

1179 

1180 context_version = version 

1181 if is_deletion_snapshot and len(sorted_timestamps) > 1: 

1182 current_index = sorted_timestamps.index(closest_timestamp) 

1183 if current_index > 0: 

1184 context_version = main_entity_history[sorted_timestamps[current_index - 1]] 

1185 

1186 if is_deletion_snapshot and len(sorted_timestamps) > 1: 

1187 subject_classes = [ 

1188 o 

1189 for _, _, o in get_triples_from_graph(context_version, (URIRef(entity_uri), RDF.type, None)) 

1190 ] 

1191 else: 

1192 subject_classes = [ 

1193 o for _, _, o in get_triples_from_graph(version, (URIRef(entity_uri), RDF.type, None)) 

1194 ] 

1195 

1196 highest_priority_class = get_highest_priority_class(subject_classes) 

1197 

1198 entity_shape = determine_shape_for_entity_triples( 

1199 list(get_triples_from_graph(context_version, (URIRef(entity_uri), None, None))) 

1200 ) 

1201 

1202 _, _, _, _, _, valid_predicates = get_valid_predicates(triples, highest_priority_class=highest_priority_class) 

1203 

1204 grouped_triples, relevant_properties = get_grouped_triples( 

1205 entity_uri, 

1206 triples, 

1207 valid_predicates, 

1208 historical_snapshot=context_version, 

1209 highest_priority_class=highest_priority_class, 

1210 highest_priority_shape=entity_shape 

1211 ) 

1212 

1213 snapshot_times = [ 

1214 convert_to_datetime(meta["generatedAtTime"]) 

1215 for meta in entity_metadata.values() 

1216 ] 

1217 snapshot_times = sorted(set(snapshot_times)) 

1218 version_number = snapshot_times.index(timestamp_dt) + 1 

1219 

1220 next_snapshot_timestamp = None 

1221 prev_snapshot_timestamp = None 

1222 

1223 for snap_time in snapshot_times: 

1224 if snap_time > timestamp_dt: 

1225 next_snapshot_timestamp = snap_time.isoformat() 

1226 break 

1227 

1228 for snap_time in reversed(snapshot_times): 

1229 if snap_time < timestamp_dt: 

1230 prev_snapshot_timestamp = snap_time.isoformat() 

1231 break 

1232 

1233 modifications = "" 

1234 if closest_metadata.get("hasUpdateQuery"): 

1235 sparql_query = closest_metadata["hasUpdateQuery"] 

1236 parsed_modifications = parse_sparql_update(sparql_query) 

1237 modifications = generate_modification_text( 

1238 parsed_modifications, 

1239 highest_priority_class, 

1240 entity_shape, 

1241 history, 

1242 entity_uri, 

1243 context_version, 

1244 closest_timestamp, 

1245 custom_filter, 

1246 ) 

1247 

1248 try: 

1249 current_index = sorted_timestamps.index(closest_timestamp) 

1250 except ValueError: 

1251 current_index = -1 

1252 

1253 if closest_metadata.get("description"): 

1254 formatted_description = _format_snapshot_description( 

1255 closest_metadata, 

1256 entity_uri, 

1257 highest_priority_class, 

1258 context_version, 

1259 history, 

1260 sorted_timestamps, 

1261 current_index, 

1262 custom_filter, 

1263 ) 

1264 closest_metadata["description"] = formatted_description 

1265 

1266 closest_timestamp = closest_metadata["generatedAtTime"] 

1267 

1268 return render_template( 

1269 "entity/version.jinja", 

1270 subject=entity_uri, 

1271 entity_type=highest_priority_class, 

1272 entity_shape=entity_shape, 

1273 metadata={closest_timestamp: closest_metadata}, 

1274 timestamp=closest_timestamp, 

1275 next_snapshot_timestamp=next_snapshot_timestamp, 

1276 prev_snapshot_timestamp=prev_snapshot_timestamp, 

1277 modifications=modifications, 

1278 grouped_triples=grouped_triples, 

1279 version_number=version_number, 

1280 version=context_version, 

1281 ) 

1282 

1283 

1284@entity_bp.route("/restore-version/<path:entity_uri>/<timestamp>", methods=["POST"]) 

1285@login_required 

1286def restore_version(entity_uri, timestamp): 

1287 """ 

1288 Restore an entity to a previous version. 

1289 

1290 Args: 

1291 entity_uri: URI of the entity to restore 

1292 timestamp: Timestamp of the version to restore to 

1293 """ 

1294 timestamp = convert_to_datetime(timestamp, stringify=True) 

1295 change_tracking_config = get_change_tracking_config() 

1296 

1297 # Get entity history 

1298 agnostic_entity = AgnosticEntity( 

1299 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True 

1300 ) 

1301 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

1302 history = convert_to_rdflib_graphs(history, get_dataset_is_quadstore()) 

1303 

1304 historical_graph = history.get(entity_uri, {}).get(timestamp) 

1305 if historical_graph is None: 

1306 abort(404) 

1307 

1308 current_graph = fetch_current_state_with_related_entities(provenance) 

1309 

1310 is_deleted = len(list(get_triples_from_graph(current_graph, (URIRef(entity_uri), None, None)))) == 0 

1311 

1312 triples_or_quads_to_delete, triples_or_quads_to_add = compute_graph_differences( 

1313 current_graph, historical_graph 

1314 ) 

1315 

1316 # Get all entities that need restoration 

1317 entities_to_restore = get_entities_to_restore( 

1318 triples_or_quads_to_delete, triples_or_quads_to_add, entity_uri 

1319 ) 

1320 

1321 # Prepare snapshot information for all entities 

1322 entity_snapshots = prepare_entity_snapshots( 

1323 entities_to_restore, provenance, timestamp 

1324 ) 

1325 

1326 # Create editor instance 

1327 editor = Editor( 

1328 get_dataset_endpoint(), 

1329 get_provenance_endpoint(), 

1330 current_app.config["COUNTER_HANDLER"], 

1331 URIRef(get_responsible_agent_uri(current_user.orcid)), 

1332 None if is_deleted else entity_snapshots[entity_uri]["source"], 

1333 current_app.config["DATASET_GENERATION_TIME"], 

1334 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

1335 ) 

1336 

1337 # Import current state into editor 

1338 if get_dataset_is_quadstore(): 

1339 for quad in current_graph.quads(): 

1340 editor.g_set.add(quad) 

1341 else: 

1342 for triple in current_graph: 

1343 editor.g_set.add(triple) 

1344 

1345 editor.preexisting_finished() 

1346 

1347 # Apply deletions 

1348 for item in triples_or_quads_to_delete: 

1349 if len(item) == 4: 

1350 editor.delete(item[0], item[1], item[2], item[3]) 

1351 else: 

1352 editor.delete(item[0], item[1], item[2]) 

1353 

1354 subject = str(item[0]) 

1355 if subject in entity_snapshots: 

1356 entity_info = entity_snapshots[subject] 

1357 if entity_info["needs_restore"]: 

1358 editor.g_set.mark_as_restored(URIRef(subject)) 

1359 editor.g_set.entity_index[URIRef(subject)]["restoration_source"] = ( 

1360 entity_info["source"] 

1361 ) 

1362 

1363 # Apply additions 

1364 for item in triples_or_quads_to_add: 

1365 if len(item) == 4: 

1366 editor.create(item[0], item[1], item[2], item[3]) 

1367 else: 

1368 editor.create(item[0], item[1], item[2]) 

1369 

1370 subject = str(item[0]) 

1371 if subject in entity_snapshots: 

1372 entity_info = entity_snapshots[subject] 

1373 if entity_info["needs_restore"]: 

1374 editor.g_set.mark_as_restored(URIRef(subject)) 

1375 editor.g_set.entity_index[URIRef(subject)]["source"] = entity_info[ 

1376 "source" 

1377 ] 

1378 

1379 # Handle main entity restoration if needed 

1380 if is_deleted and entity_uri in entity_snapshots: 

1381 editor.g_set.mark_as_restored(URIRef(entity_uri)) 

1382 source = entity_snapshots[entity_uri]["source"] 

1383 editor.g_set.entity_index[URIRef(entity_uri)]["source"] = source 

1384 

1385 try: 

1386 editor.save() 

1387 flash(gettext("Version restored successfully"), "success") 

1388 except Exception as e: 

1389 flash( 

1390 gettext( 

1391 "An error occurred while restoring the version: %(error)s", error=str(e) 

1392 ), 

1393 "error", 

1394 ) 

1395 

1396 return redirect(url_for("entity.about", subject=entity_uri)) 

1397 

1398 

1399def compute_graph_differences( 

1400 current_graph: Graph | Dataset, historical_graph: Graph | Dataset 

1401): 

1402 if get_dataset_is_quadstore(): 

1403 current_data = set(current_graph.quads()) 

1404 historical_data = set(historical_graph.quads()) 

1405 else: 

1406 current_data = set(get_triples_from_graph(current_graph, (None, None, None))) 

1407 historical_data = set(get_triples_from_graph(historical_graph, (None, None, None))) 

1408 triples_or_quads_to_delete = current_data - historical_data 

1409 triples_or_quads_to_add = historical_data - current_data 

1410 

1411 return triples_or_quads_to_delete, triples_or_quads_to_add 

1412 

1413 

1414def get_entities_to_restore( 

1415 triples_or_quads_to_delete: set, triples_or_quads_to_add: set, main_entity_uri: str 

1416) -> set: 

1417 """ 

1418 Identify all entities that need to be restored based on the graph differences. 

1419 

1420 Args: 

1421 triples_or_quads_to_delete: Set of triples/quads to be deleted 

1422 triples_or_quads_to_add: Set of triples/quads to be added 

1423 main_entity_uri: URI of the main entity being restored 

1424 

1425 Returns: 

1426 Set of entity URIs that need to be restored 

1427 """ 

1428 entities_to_restore = {main_entity_uri} 

1429 

1430 for item in list(triples_or_quads_to_delete) + list(triples_or_quads_to_add): 

1431 predicate = str(item[1]) 

1432 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": 

1433 continue 

1434 

1435 subject = str(item[0]) 

1436 obj = str(item[2]) 

1437 for uri in [subject, obj]: 

1438 if uri != main_entity_uri and validators.url(uri): 

1439 entities_to_restore.add(uri) 

1440 

1441 return entities_to_restore 

1442 

1443 

1444def prepare_entity_snapshots( 

1445 entities_to_restore: set, provenance: dict, target_time: str 

1446) -> dict: 

1447 """ 

1448 Prepare snapshot information for all entities that need to be restored. 

1449 

1450 Args: 

1451 entities_to_restore: Set of entity URIs to process 

1452 provenance: Dictionary containing provenance data for all entities 

1453 target_time: Target restoration time 

1454 

1455 Returns: 

1456 Dictionary mapping entity URIs to their restoration information 

1457 """ 

1458 entity_snapshots = {} 

1459 

1460 for entity_uri in entities_to_restore: 

1461 if entity_uri not in provenance: 

1462 continue 

1463 

1464 # Find the appropriate source snapshot 

1465 source_snapshot = find_appropriate_snapshot(provenance[entity_uri], target_time) 

1466 if not source_snapshot: 

1467 continue 

1468 

1469 # Check if entity is currently deleted by examining its latest snapshot 

1470 sorted_snapshots = sorted( 

1471 provenance[entity_uri].items(), 

1472 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]), 

1473 ) 

1474 latest_snapshot = sorted_snapshots[-1][1] 

1475 is_deleted = ( 

1476 latest_snapshot.get("invalidatedAtTime") 

1477 and latest_snapshot["generatedAtTime"] 

1478 == latest_snapshot["invalidatedAtTime"] 

1479 ) 

1480 

1481 entity_snapshots[entity_uri] = { 

1482 "source": source_snapshot, 

1483 "needs_restore": is_deleted, 

1484 } 

1485 

1486 return entity_snapshots 

1487 

1488 

1489def find_appropriate_snapshot(provenance_data: dict, target_time: str) -> Optional[str]: 

1490 """ 

1491 Find the most appropriate snapshot to use as a source for restoration. 

1492 

1493 Args: 

1494 provenance_data: Dictionary of snapshots and their metadata for an entity 

1495 target_time: The target restoration time as ISO format string 

1496 

1497 Returns: 

1498 The URI of the most appropriate snapshot, or None if no suitable snapshot is found 

1499 """ 

1500 target_datetime = convert_to_datetime(target_time) 

1501 

1502 # Convert all generation times to datetime for comparison 

1503 valid_snapshots = [] 

1504 for snapshot_uri, metadata in provenance_data.items(): 

1505 generation_time = convert_to_datetime(metadata["generatedAtTime"]) 

1506 

1507 # Skip deletion snapshots (where generation time equals invalidation time) 

1508 if ( 

1509 metadata.get("invalidatedAtTime") 

1510 and metadata["generatedAtTime"] == metadata["invalidatedAtTime"] 

1511 ): 

1512 continue 

1513 

1514 # Only consider snapshots up to our target time 

1515 if generation_time <= target_datetime: 

1516 valid_snapshots.append((generation_time, snapshot_uri)) 

1517 

1518 if not valid_snapshots: 

1519 return None 

1520 

1521 # Sort by generation time and take the most recent one 

1522 valid_snapshots.sort(key=lambda x: x[0]) 

1523 return valid_snapshots[-1][1] 

1524 

1525 

1526def determine_object_class_and_shape(object_value: str, relevant_snapshot: Graph) -> tuple[Optional[str], Optional[str]]: 

1527 """ 

1528 Determine the class and shape for an object value from a graph snapshot. 

1529  

1530 Args: 

1531 object_value: The object value (URI or literal) 

1532 relevant_snapshot: Graph snapshot to query for object information 

1533  

1534 Returns: 

1535 Tuple of (object_class, object_shape_uri) or (None, None) if not determinable 

1536 """ 

1537 if not validators.url(str(object_value)) or not relevant_snapshot: 

1538 return None, None 

1539 

1540 object_triples = list(get_triples_from_graph(relevant_snapshot, (URIRef(object_value), None, None))) 

1541 if not object_triples: 

1542 return None, None 

1543 

1544 object_shape_uri = determine_shape_for_entity_triples(object_triples) 

1545 object_classes = [ 

1546 str(o) 

1547 for _, _, o in get_triples_from_graph( 

1548 relevant_snapshot, (URIRef(object_value), RDF.type, None) 

1549 ) 

1550 ] 

1551 object_class = get_highest_priority_class(object_classes) if object_classes else None 

1552 

1553 return object_class, object_shape_uri 

1554 

1555 

1556def generate_modification_text( 

1557 modifications, 

1558 highest_priority_class, 

1559 entity_shape, 

1560 history, 

1561 entity_uri, 

1562 current_snapshot, 

1563 current_snapshot_timestamp, 

1564 custom_filter: Filter, 

1565) -> str: 

1566 """ 

1567 Generate HTML text describing modifications to an entity, using display rules for property ordering. 

1568 

1569 Args: 

1570 modifications (dict): Dictionary of modifications from parse_sparql_update 

1571 highest_priority_class (str): The highest priority class for the subject entity 

1572 entity_shape (str): The shape for the subject entity 

1573 history (dict): Historical snapshots dictionary 

1574 entity_uri (str): URI of the entity being modified 

1575 current_snapshot (Graph): Current entity snapshot 

1576 current_snapshot_timestamp (str): Timestamp of current snapshot 

1577 custom_filter (Filter): Filter instance for formatting 

1578 

1579 Returns: 

1580 str: HTML text describing the modifications 

1581 """ 

1582 modification_text = "<p><strong>" + gettext("Modifications") + "</strong></p>" 

1583 

1584 ordered_properties = get_property_order_from_rules(highest_priority_class, entity_shape) 

1585 

1586 for mod_type, triples in modifications.items(): 

1587 modification_text += "<ul class='list-group mb-3'><p>" 

1588 if mod_type == gettext("Additions"): 

1589 modification_text += '<i class="bi bi-plus-circle-fill text-success"></i>' 

1590 elif mod_type == gettext("Deletions"): 

1591 modification_text += '<i class="bi bi-dash-circle-fill text-danger"></i>' 

1592 modification_text += " <em>" + gettext(mod_type) + "</em></p>" 

1593 

1594 object_shapes_cache = {} 

1595 object_classes_cache = {} 

1596 

1597 relevant_snapshot = None 

1598 if ( 

1599 mod_type == gettext("Deletions") 

1600 and history 

1601 and entity_uri 

1602 and current_snapshot_timestamp 

1603 ): 

1604 sorted_timestamps = sorted(history[entity_uri].keys()) 

1605 current_index = sorted_timestamps.index(current_snapshot_timestamp) 

1606 if current_index > 0: 

1607 relevant_snapshot = history[entity_uri][ 

1608 sorted_timestamps[current_index - 1] 

1609 ] 

1610 else: 

1611 relevant_snapshot = current_snapshot 

1612 

1613 if relevant_snapshot: 

1614 for triple in triples: 

1615 object_value = triple[2] 

1616 object_class, object_shape = determine_object_class_and_shape(object_value, relevant_snapshot) 

1617 object_classes_cache[str(object_value)] = object_class 

1618 object_shapes_cache[str(object_value)] = object_shape 

1619 

1620 predicate_shape_groups = {} 

1621 predicate_ordering_cache = {} 

1622 entity_position_cache = {} 

1623 

1624 for triple in triples: 

1625 predicate = str(triple[1]) 

1626 object_value = str(triple[2]) 

1627 object_shape_uri = object_shapes_cache.get(object_value) 

1628 

1629 if predicate not in predicate_ordering_cache: 

1630 predicate_ordering_cache[predicate] = get_predicate_ordering_info(predicate, highest_priority_class, entity_shape) 

1631 

1632 order_property = predicate_ordering_cache[predicate] 

1633 if order_property and validators.url(object_value) and relevant_snapshot: 

1634 position_key = (object_value, predicate) 

1635 if position_key not in entity_position_cache: 

1636 entity_position_cache[position_key] = get_entity_position_in_sequence( 

1637 object_value, entity_uri, predicate, order_property, relevant_snapshot 

1638 ) 

1639 

1640 group_key = (predicate, object_shape_uri) 

1641 if group_key not in predicate_shape_groups: 

1642 predicate_shape_groups[group_key] = [] 

1643 predicate_shape_groups[group_key].append(triple) 

1644 

1645 processed_predicates = set() 

1646 

1647 def get_cached_position(triple, predicate_uri): 

1648 object_value = str(triple[2]) 

1649 position_key = (object_value, predicate_uri) 

1650 if position_key in entity_position_cache: 

1651 return entity_position_cache[position_key] 

1652 return float('inf') 

1653 

1654 for predicate in ordered_properties: 

1655 shape_order = get_shape_order_from_display_rules(highest_priority_class, entity_shape, predicate) 

1656 predicate_groups = [] 

1657 for group_key, group_triples in predicate_shape_groups.items(): 

1658 predicate_uri, object_shape_uri = group_key 

1659 if predicate_uri == predicate: 

1660 if object_shape_uri and object_shape_uri in shape_order: 

1661 shape_priority = shape_order.index(object_shape_uri) 

1662 else: 

1663 # Objects without shapes or shapes not in display rules go at the end 

1664 shape_priority = len(shape_order) 

1665 

1666 predicate_groups.append((shape_priority, group_key, group_triples)) 

1667 

1668 predicate_groups.sort(key=lambda x: x[0]) 

1669 for _, group_key, group_triples in predicate_groups: 

1670 processed_predicates.add(group_key) 

1671 

1672 predicate_uri, _ = group_key 

1673 order_property = predicate_ordering_cache.get(predicate_uri) 

1674 

1675 if order_property and relevant_snapshot: 

1676 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri)) 

1677 

1678 for triple in group_triples: 

1679 modification_text += format_triple_modification( 

1680 triple, 

1681 highest_priority_class, 

1682 entity_shape, 

1683 object_shapes_cache, 

1684 object_classes_cache, 

1685 relevant_snapshot, 

1686 custom_filter, 

1687 subject_uri=entity_uri, 

1688 predicate_ordering_cache=predicate_ordering_cache, 

1689 entity_position_cache=entity_position_cache, 

1690 ) 

1691 

1692 # Then handle any remaining predicate+shape groups not in the ordered list 

1693 for group_key, group_triples in predicate_shape_groups.items(): 

1694 if group_key not in processed_predicates: 

1695 # Sort remaining triples by their cached positions too 

1696 predicate_uri, _ = group_key 

1697 order_property = predicate_ordering_cache.get(predicate_uri) 

1698 

1699 if order_property and relevant_snapshot: 

1700 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri)) 

1701 

1702 for triple in group_triples: 

1703 modification_text += format_triple_modification( 

1704 triple, 

1705 highest_priority_class, 

1706 entity_shape, 

1707 object_shapes_cache, 

1708 object_classes_cache, 

1709 relevant_snapshot, 

1710 custom_filter, 

1711 subject_uri=entity_uri, 

1712 predicate_ordering_cache=predicate_ordering_cache, 

1713 entity_position_cache=entity_position_cache, 

1714 ) 

1715 

1716 modification_text += "</ul>" 

1717 

1718 return modification_text 

1719 

1720 

1721def format_triple_modification( 

1722 triple: Tuple[URIRef, URIRef, URIRef|Literal], 

1723 highest_priority_class: str, 

1724 entity_shape: str, 

1725 object_shapes_cache: dict, 

1726 object_classes_cache: dict, 

1727 relevant_snapshot: Optional[Graph], 

1728 custom_filter: Filter, 

1729 subject_uri: str = None, 

1730 predicate_ordering_cache: Optional[dict] = None, 

1731 entity_position_cache: Optional[dict] = None, 

1732) -> str: 

1733 """ 

1734 Format a single triple modification as HTML. 

1735 

1736 Args: 

1737 triple: The RDF triple being modified 

1738 highest_priority_class: The highest priority class for the subject entity 

1739 entity_shape: The shape for the subject entity 

1740 object_shapes_cache: Pre-computed cache of object shapes 

1741 object_classes_cache: Pre-computed cache of object classes 

1742 relevant_snapshot: Graph snapshot for context 

1743 custom_filter (Filter): Filter instance for formatting 

1744 subject_uri: URI of the subject entity (for ordering queries) 

1745 

1746 Returns: 

1747 str: HTML text describing the modification 

1748 """ 

1749 predicate = triple[1] 

1750 object_value = triple[2] 

1751 

1752 object_shape_uri = object_shapes_cache.get(str(object_value)) 

1753 

1754 predicate_label = custom_filter.human_readable_predicate( 

1755 predicate, (highest_priority_class, entity_shape), object_shape_uri=object_shape_uri 

1756 ) 

1757 

1758 object_class = object_classes_cache.get(str(object_value)) # Get from classes cache 

1759 object_label = get_object_label( 

1760 object_value, 

1761 predicate, 

1762 object_shape_uri, 

1763 object_class, 

1764 relevant_snapshot, 

1765 custom_filter, 

1766 subject_entity_key=(highest_priority_class, entity_shape), 

1767 ) 

1768 

1769 order_info = "" 

1770 if subject_uri and validators.url(str(object_value)): 

1771 if predicate_ordering_cache and entity_position_cache: 

1772 order_property = predicate_ordering_cache.get(str(predicate)) 

1773 if order_property: 

1774 position_key = (str(object_value), str(predicate)) 

1775 position = entity_position_cache.get(position_key) 

1776 if position is not None: 

1777 order_info = f' <span class="order-position-badge">#{position}</span>' 

1778 

1779 return f""" 

1780 <li class='d-flex align-items-center'> 

1781 <span class='flex-grow-1 d-flex flex-column justify-content-center ms-3 mb-2 w-100'> 

1782 <strong>{predicate_label}{order_info}</strong> 

1783 <span class="object-value word-wrap">{object_label}</span> 

1784 </span> 

1785 </li>""" 

1786 

1787 

1788def get_object_label( 

1789 object_value: str, 

1790 predicate: str, 

1791 object_shape_uri: Optional[str], 

1792 object_class: Optional[str], 

1793 snapshot: Optional[Graph], 

1794 custom_filter: Filter, 

1795 subject_entity_key: Optional[tuple] = None, 

1796) -> str: 

1797 """ 

1798 Get appropriate display label for an object value. 

1799 

1800 Args: 

1801 object_value: The value to get a label for 

1802 predicate: The predicate URI 

1803 object_shape_uri: Pre-computed shape URI for the object 

1804 object_class: Pre-computed class for the object 

1805 snapshot: Graph snapshot for context (essential for deleted triples) 

1806 custom_filter (Filter): Custom filter instance for formatting 

1807 subject_entity_key: Tuple of (class, shape) for the subject entity 

1808 

1809 Returns: 

1810 str: A human-readable label for the object value 

1811 """ 

1812 predicate = str(predicate) 

1813 

1814 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": 

1815 return custom_filter.human_readable_class(subject_entity_key) 

1816 

1817 if validators.url(object_value): 

1818 if object_shape_uri or object_class: 

1819 return custom_filter.human_readable_entity( 

1820 object_value, (object_class, object_shape_uri), snapshot 

1821 ) 

1822 else: 

1823 return str(object_value) 

1824 

1825 return str(object_value) 

1826 

1827 

1828def process_modification_data(data: dict) -> Tuple[str, List[dict]]: 

1829 """ 

1830 Process modification data to extract subjects and predicates. 

1831 

1832 Args: 

1833 data: Dictionary containing modification data 

1834 

1835 Returns: 

1836 Tuple containing subject URI and list of modification details 

1837 """ 

1838 subject_uri = data.get("subject") 

1839 if not subject_uri: 

1840 raise ValueError("No subject URI provided in modification data") 

1841 

1842 modifications = data.get("modifications", []) 

1843 if not modifications: 

1844 raise ValueError("No modifications provided in data") 

1845 

1846 return subject_uri, modifications 

1847 

1848 

1849def validate_modification( 

1850 modification: dict, subject_uri: str 

1851) -> Tuple[bool, str]: 

1852 """ 

1853 Validate a single modification operation. 

1854 

1855 Args: 

1856 modification: Dictionary containing modification details 

1857 subject_uri: URI of the subject being modified 

1858 

1859 Returns: 

1860 Tuple of (is_valid, error_message) 

1861 """ 

1862 form_fields = get_form_fields() 

1863 operation = modification.get("operation") 

1864 if not operation: 

1865 return False, "No operation specified in modification" 

1866 

1867 predicate = modification.get("predicate") 

1868 if not predicate: 

1869 return False, "No predicate specified in modification" 

1870 

1871 if operation not in ["add", "remove", "update"]: 

1872 return False, f"Invalid operation: {operation}" 

1873 

1874 if form_fields: 

1875 entity_type = modification.get("entity_type") 

1876 entity_shape = modification.get("entity_shape") 

1877 

1878 # If entity_type is not provided in modification, get it from the database 

1879 if not entity_type: 

1880 entity_types = get_entity_types(subject_uri) 

1881 if entity_types: 

1882 entity_type = get_highest_priority_class(entity_types) 

1883 

1884 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

1885 

1886 if matching_key: 

1887 predicate_fields = form_fields[matching_key].get(predicate, []) 

1888 

1889 for field in predicate_fields: 

1890 if operation == "remove" and field.get("minCount", 0) > 0: 

1891 return False, f"Cannot remove required predicate: {predicate}" 

1892 

1893 if operation == "add": 

1894 current_count = get_predicate_count(subject_uri, predicate) 

1895 max_count = field.get("maxCount") 

1896 

1897 if max_count and current_count >= max_count: 

1898 return ( 

1899 False, 

1900 f"Maximum count exceeded for predicate: {predicate}", 

1901 ) 

1902 

1903 return True, "" 

1904 

1905 

1906def get_predicate_count(subject_uri: str, predicate: str) -> int: 

1907 """ 

1908 Get the current count of values for a predicate. 

1909 

1910 Args: 

1911 subject_uri: URI of the entity 

1912 predicate: Predicate URI to count 

1913 

1914 Returns: 

1915 Number of values for the predicate 

1916 """ 

1917 sparql = get_sparql() 

1918 

1919 query = f""" 

1920 SELECT (COUNT(?o) as ?count) WHERE {{ 

1921 <{subject_uri}> <{predicate}> ?o . 

1922 }} 

1923 """ 

1924 

1925 sparql.setQuery(query) 

1926 sparql.setReturnFormat(JSON) 

1927 results = sparql.query().convert() 

1928 

1929 return int(results["results"]["bindings"][0]["count"]["value"]) 

1930 

1931 

1932def apply_modifications( 

1933 editor: Editor, 

1934 modifications: List[dict], 

1935 subject_uri: str, 

1936 graph_uri: Optional[str] = None, 

1937): 

1938 """ 

1939 Apply a list of modifications to an entity. 

1940 

1941 Args: 

1942 editor: Editor instance to use for modifications 

1943 modifications: List of modification operations 

1944 subject_uri: URI of the entity being modified 

1945 graph_uri: Optional graph URI for quad store 

1946 """ 

1947 for mod in modifications: 

1948 operation = mod["operation"] 

1949 predicate = mod["predicate"] 

1950 

1951 if operation == "remove": 

1952 editor.delete(URIRef(subject_uri), URIRef(predicate), graph_uri=graph_uri) 

1953 

1954 elif operation == "add": 

1955 value = mod["value"] 

1956 datatype = mod.get("datatype", XSD.string) 

1957 

1958 if validators.url(value): 

1959 object_value = URIRef(value) 

1960 else: 

1961 object_value = Literal(value, datatype=URIRef(datatype)) 

1962 

1963 editor.create( 

1964 URIRef(subject_uri), URIRef(predicate), object_value, graph_uri 

1965 ) 

1966 

1967 elif operation == "update": 

1968 old_value = mod["oldValue"] 

1969 new_value = mod["newValue"] 

1970 datatype = mod.get("datatype", XSD.string) 

1971 

1972 if validators.url(old_value): 

1973 old_object = URIRef(old_value) 

1974 else: 

1975 old_object = Literal(old_value, datatype=URIRef(datatype)) 

1976 

1977 if validators.url(new_value): 

1978 new_object = URIRef(new_value) 

1979 else: 

1980 new_object = Literal(new_value, datatype=URIRef(datatype)) 

1981 

1982 editor.update( 

1983 URIRef(subject_uri), 

1984 URIRef(predicate), 

1985 old_object, 

1986 new_object, 

1987 graph_uri, 

1988 )