Coverage for heritrace/routes/entity.py: 90%

761 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-08-01 22:12 +0000

1import json 

2import re 

3from datetime import datetime 

4from typing import List, Optional, Tuple 

5 

6import validators 

7from flask import (Blueprint, abort, current_app, flash, jsonify, redirect, 

8 render_template, request, url_for) 

9from flask_babel import gettext 

10from flask_login import current_user, login_required 

11from heritrace.apis.orcid import get_responsible_agent_uri 

12from heritrace.editor import Editor 

13from heritrace.extensions import (get_change_tracking_config, 

14 get_custom_filter, get_dataset_endpoint, 

15 get_dataset_is_quadstore, get_display_rules, 

16 get_form_fields, get_provenance_endpoint, 

17 get_provenance_sparql, get_shacl_graph, 

18 get_sparql) 

19from heritrace.forms import * 

20from heritrace.utils.converters import convert_to_datetime 

21from heritrace.utils.datatypes import DATATYPE_MAPPING, get_datatype_options 

22from heritrace.utils.display_rules_utils import ( 

23 get_class_priority, get_grouped_triples, get_highest_priority_class, 

24 get_predicate_ordering_info, get_property_order_from_rules, 

25 get_shape_order_from_display_rules, is_entity_type_visible) 

26from heritrace.utils.filters import Filter 

27from heritrace.utils.primary_source_utils import ( 

28 get_default_primary_source, save_user_default_primary_source) 

29from heritrace.utils.shacl_utils import (determine_shape_for_entity_triples, 

30 find_matching_form_field, 

31 get_entity_position_in_sequence) 

32from heritrace.utils.shacl_validation import get_valid_predicates 

33from heritrace.utils.sparql_utils import ( 

34 determine_shape_for_classes, fetch_current_state_with_related_entities, 

35 fetch_data_graph_for_subject, get_entity_types, import_referenced_entities, 

36 parse_sparql_update) 

37from heritrace.utils.uri_utils import generate_unique_uri 

38from rdflib import RDF, XSD, ConjunctiveGraph, Graph, Literal, URIRef 

39from SPARQLWrapper import JSON 

40from time_agnostic_library.agnostic_entity import AgnosticEntity 

41 

42entity_bp = Blueprint("entity", __name__) 

43 

44 

45def get_deleted_entity_context_info(is_deleted: bool, sorted_timestamps: List[str], 

46 history: dict, subject: str) -> Tuple[Optional[Graph], Optional[str], Optional[str]]: 

47 """ 

48 Extract context information for deleted entities with multiple timestamps. 

49  

50 When an entity is deleted but has multiple timestamps in its history, 

51 this function retrieves the context snapshot from the second-to-last timestamp 

52 and determines the entity's highest priority class and shape. 

53  

54 Args: 

55 is_deleted: Whether the entity is deleted 

56 sorted_timestamps: List of timestamps in chronological order 

57 history: Dictionary mapping subject -> timestamp -> Graph 

58 subject: The entity URI as string 

59  

60 Returns: 

61 Tuple of (context_snapshot, highest_priority_class, entity_shape) 

62 Returns (None, None, None) if conditions are not met 

63 """ 

64 if is_deleted and len(sorted_timestamps) > 1: 

65 context_snapshot = history[subject][sorted_timestamps[-2]] 

66 

67 subject_classes = [ 

68 o 

69 for _, _, o in context_snapshot.triples( 

70 (URIRef(subject), RDF.type, None) 

71 ) 

72 ] 

73 

74 highest_priority_class = get_highest_priority_class(subject_classes) 

75 entity_shape = determine_shape_for_entity_triples( 

76 list(context_snapshot.triples((URIRef(subject), None, None))) 

77 ) 

78 

79 return context_snapshot, highest_priority_class, entity_shape 

80 else: 

81 return None, None, None 

82 

83 

84@entity_bp.route("/about/<path:subject>") 

85@login_required 

86def about(subject): 

87 """ 

88 Display detailed information about an entity. 

89 

90 Args: 

91 subject: URI of the entity to display 

92 """ 

93 change_tracking_config = get_change_tracking_config() 

94 

95 default_primary_source = get_default_primary_source(current_user.orcid) 

96 

97 agnostic_entity = AgnosticEntity( 

98 res=subject, config=change_tracking_config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False 

99 ) 

100 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

101 

102 is_deleted = False 

103 context_snapshot = None 

104 subject_classes = [] 

105 highest_priority_class = None 

106 entity_shape = None 

107 

108 if history.get(subject): 

109 sorted_timestamps = sorted(history[subject].keys()) 

110 latest_metadata = next( 

111 ( 

112 meta 

113 for _, meta in provenance[subject].items() 

114 if meta["generatedAtTime"] == sorted_timestamps[-1] 

115 ), 

116 None, 

117 ) 

118 

119 is_deleted = ( 

120 latest_metadata 

121 and "invalidatedAtTime" in latest_metadata 

122 and latest_metadata["invalidatedAtTime"] 

123 ) 

124 

125 context_snapshot, highest_priority_class, entity_shape = get_deleted_entity_context_info( 

126 is_deleted, sorted_timestamps, history, subject 

127 ) 

128 

129 grouped_triples = {} 

130 can_be_added = [] 

131 can_be_deleted = [] 

132 datatypes = {} 

133 mandatory_values = {} 

134 optional_values = {} 

135 valid_predicates = [] 

136 data_graph = None 

137 

138 if not is_deleted: 

139 data_graph = fetch_data_graph_for_subject(subject) 

140 if data_graph: 

141 triples = list(data_graph.triples((None, None, None))) 

142 subject_classes = [o for s, p, o in data_graph.triples((URIRef(subject), RDF.type, None))] 

143 

144 highest_priority_class = get_highest_priority_class(subject_classes) 

145 entity_shape = determine_shape_for_entity_triples( 

146 list(data_graph.triples((URIRef(subject), None, None))) 

147 ) 

148 

149 ( 

150 can_be_added, 

151 can_be_deleted, 

152 datatypes, 

153 mandatory_values, 

154 optional_values, 

155 valid_predicates, 

156 ) = get_valid_predicates(triples, highest_priority_class=highest_priority_class) 

157 

158 grouped_triples, relevant_properties = get_grouped_triples( 

159 subject, triples, valid_predicates, highest_priority_class=highest_priority_class, highest_priority_shape=entity_shape 

160 ) 

161 

162 can_be_added = [uri for uri in can_be_added if uri in relevant_properties] 

163 can_be_deleted = [ 

164 uri for uri in can_be_deleted if uri in relevant_properties 

165 ] 

166 

167 update_form = UpdateTripleForm() 

168 

169 form_fields = get_form_fields() 

170 

171 datatype_options = get_datatype_options() 

172 

173 predicate_details_map = {} 

174 for entity_type_key, predicates in form_fields.items(): 

175 for predicate_uri, details_list in predicates.items(): 

176 for details in details_list: 

177 shape = details.get("nodeShape") 

178 key = (predicate_uri, entity_type_key, shape) 

179 predicate_details_map[key] = details 

180 

181 return render_template( 

182 "entity/about.jinja", 

183 subject=subject, 

184 history=history, 

185 can_be_added=can_be_added, 

186 can_be_deleted=can_be_deleted, 

187 datatypes=datatypes, 

188 update_form=update_form, 

189 mandatory_values=mandatory_values, 

190 optional_values=optional_values, 

191 shacl=bool(len(get_shacl_graph())), 

192 grouped_triples=grouped_triples, 

193 display_rules=get_display_rules(), 

194 form_fields=form_fields, 

195 entity_type=highest_priority_class, 

196 entity_shape=entity_shape, 

197 predicate_details_map=predicate_details_map, 

198 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"], 

199 dataset_db_text_index_enabled=current_app.config[ 

200 "DATASET_DB_TEXT_INDEX_ENABLED" 

201 ], 

202 is_deleted=is_deleted, 

203 context=context_snapshot, 

204 default_primary_source=default_primary_source, 

205 datatype_options=datatype_options, 

206 ) 

207 

208 

209@entity_bp.route("/create-entity", methods=["GET", "POST"]) 

210@login_required 

211def create_entity(): 

212 """ 

213 Create a new entity in the dataset. 

214 """ 

215 form_fields = get_form_fields() 

216 

217 default_primary_source = get_default_primary_source(current_user.orcid) 

218 

219 entity_class_shape_pairs = sorted( 

220 [ 

221 entity_key 

222 for entity_key in form_fields.keys() 

223 if is_entity_type_visible(entity_key) 

224 ], 

225 key=lambda et: get_class_priority(et), 

226 reverse=True, 

227 ) 

228 

229 datatype_options = get_datatype_options() 

230 

231 if request.method == "POST": 

232 structured_data = json.loads(request.form.get("structured_data", "{}")) 

233 primary_source = request.form.get("primary_source") or None 

234 save_default_source = request.form.get("save_default_source") == 'true' 

235 

236 if primary_source and not validators.url(primary_source): 

237 return jsonify({"status": "error", "errors": [gettext("Invalid primary source URL provided")]}), 400 

238 

239 if save_default_source and primary_source and validators.url(primary_source): 

240 save_user_default_primary_source(current_user.orcid, primary_source) 

241 

242 editor = Editor( 

243 get_dataset_endpoint(), 

244 get_provenance_endpoint(), 

245 current_app.config["COUNTER_HANDLER"], 

246 URIRef(get_responsible_agent_uri(current_user.orcid)), 

247 primary_source, 

248 current_app.config["DATASET_GENERATION_TIME"], 

249 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

250 ) 

251 

252 if form_fields: 

253 validation_errors = validate_entity_data(structured_data) 

254 if validation_errors: 

255 return jsonify({"status": "error", "errors": validation_errors}), 400 

256 

257 entity_type = structured_data.get("entity_type") 

258 properties = structured_data.get("properties", {}) 

259 

260 entity_uri = generate_unique_uri(entity_type) 

261 

262 import_referenced_entities(editor, structured_data) 

263 

264 editor.preexisting_finished() 

265 

266 default_graph_uri = ( 

267 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None 

268 ) 

269 

270 for predicate, values in properties.items(): 

271 if not isinstance(values, list): 

272 values = [values] 

273 

274 entity_shape = structured_data.get("entity_shape") 

275 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

276 

277 field_definitions = form_fields.get(matching_key, {}).get(predicate, []) if matching_key else [] 

278 

279 # Get the shape from the property value if available 

280 property_shape = None 

281 if values and isinstance(values[0], dict): 

282 property_shape = values[0].get("shape") 

283 

284 # Filter field definitions to find the matching one based on shape 

285 matching_field_def = None 

286 for field_def in field_definitions: 

287 if property_shape: 

288 # If property has a shape, match it with the field definition's subjectShape 

289 if field_def.get("subjectShape") == property_shape: 

290 matching_field_def = field_def 

291 break 

292 else: 

293 # If no shape specified, use the first field definition without a shape requirement 

294 if not field_def.get("subjectShape"): 

295 matching_field_def = field_def 

296 break 

297 

298 # If no matching field definition found, use the first one (default behavior) 

299 if not matching_field_def and field_definitions: 

300 matching_field_def = field_definitions[0] 

301 

302 ordered_by = ( 

303 matching_field_def.get("orderedBy") if matching_field_def else None 

304 ) 

305 

306 if ordered_by: 

307 process_ordered_properties( 

308 editor, entity_uri, predicate, values, default_graph_uri, ordered_by 

309 ) 

310 else: 

311 # Handle unordered properties 

312 process_unordered_properties( 

313 editor, entity_uri, predicate, values, default_graph_uri, matching_field_def 

314 ) 

315 else: 

316 entity_type = structured_data.get("entity_type") 

317 properties = structured_data.get("properties", {}) 

318 

319 entity_uri = generate_unique_uri(entity_type) 

320 editor.import_entity(entity_uri) 

321 

322 import_referenced_entities(editor, structured_data) 

323 

324 editor.preexisting_finished() 

325 

326 default_graph_uri = ( 

327 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None 

328 ) 

329 

330 editor.create( 

331 entity_uri, 

332 RDF.type, 

333 URIRef(entity_type), 

334 default_graph_uri, 

335 ) 

336 

337 for predicate, values in properties.items(): 

338 for value_dict in values: 

339 if value_dict["type"] == "uri": 

340 editor.create( 

341 entity_uri, 

342 URIRef(predicate), 

343 URIRef(value_dict["value"]), 

344 default_graph_uri, 

345 ) 

346 elif value_dict["type"] == "literal": 

347 datatype = ( 

348 URIRef(value_dict["datatype"]) 

349 if "datatype" in value_dict 

350 else XSD.string 

351 ) 

352 editor.create( 

353 entity_uri, 

354 URIRef(predicate), 

355 Literal(value_dict["value"], datatype=datatype), 

356 default_graph_uri, 

357 ) 

358 

359 try: 

360 editor.save() 

361 response = jsonify( 

362 { 

363 "status": "success", 

364 "redirect_url": url_for("entity.about", subject=str(entity_uri)), 

365 } 

366 ) 

367 flash(gettext("Entity created successfully"), "success") 

368 return response, 200 

369 except Exception as e: 

370 error_message = gettext( 

371 "An error occurred while creating the entity: %(error)s", error=str(e) 

372 ) 

373 return jsonify({"status": "error", "errors": [error_message]}), 500 

374 

375 return render_template( 

376 "create_entity.jinja", 

377 form_fields=form_fields, 

378 datatype_options=datatype_options, 

379 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"], 

380 dataset_db_text_index_enabled=current_app.config[ 

381 "DATASET_DB_TEXT_INDEX_ENABLED" 

382 ], 

383 default_primary_source=default_primary_source, 

384 shacl=bool(get_form_fields()), 

385 entity_class_shape_pairs=entity_class_shape_pairs 

386 ) 

387 

388 

389def create_nested_entity( 

390 editor: Editor, entity_uri, entity_data, graph_uri=None 

391): 

392 form_fields = get_form_fields() 

393 

394 editor.create( 

395 entity_uri, 

396 URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), 

397 URIRef(entity_data["entity_type"]), 

398 graph_uri, 

399 ) 

400 

401 entity_type = entity_data.get("entity_type") 

402 entity_shape = entity_data.get("entity_shape") 

403 properties = entity_data.get("properties", {}) 

404 

405 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

406 

407 if not matching_key: 

408 return 

409 

410 # Add other properties 

411 for predicate, values in properties.items(): 

412 if not isinstance(values, list): 

413 values = [values] 

414 field_definitions = form_fields[matching_key].get(predicate, []) 

415 

416 for value in values: 

417 if isinstance(value, dict) and "entity_type" in value: 

418 if "intermediateRelation" in value: 

419 intermediate_uri = generate_unique_uri( 

420 value["intermediateRelation"]["class"] 

421 ) 

422 target_uri = generate_unique_uri(value["entity_type"]) 

423 editor.create( 

424 entity_uri, URIRef(predicate), intermediate_uri, graph_uri 

425 ) 

426 editor.create( 

427 intermediate_uri, 

428 URIRef(value["intermediateRelation"]["property"]), 

429 target_uri, 

430 graph_uri, 

431 ) 

432 create_nested_entity( 

433 editor, target_uri, value, graph_uri 

434 ) 

435 else: 

436 # Handle nested entities 

437 nested_uri = generate_unique_uri(value["entity_type"]) 

438 editor.create(entity_uri, URIRef(predicate), nested_uri, graph_uri) 

439 create_nested_entity( 

440 editor, nested_uri, value, graph_uri 

441 ) 

442 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

443 existing_entity_uri = value.get("entity_uri") 

444 if existing_entity_uri: 

445 editor.create(entity_uri, URIRef(predicate), URIRef(existing_entity_uri), graph_uri) 

446 else: 

447 # Handle simple properties - check if it's a URI or literal 

448 if validators.url(str(value)): 

449 object_value = URIRef(value) 

450 else: 

451 datatype = XSD.string # Default to string if not specified 

452 datatype_uris = [] 

453 if field_definitions: 

454 datatype_uris = field_definitions[0].get("datatypes", []) 

455 datatype = determine_datatype(value, datatype_uris) 

456 object_value = Literal(value, datatype=datatype) 

457 editor.create(entity_uri, URIRef(predicate), object_value, graph_uri) 

458 

459 

460def process_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri, matching_field_def): 

461 """ 

462 Process a single entity value, handling nested entities, existing entity references, and simple literals. 

463  

464 Args: 

465 editor: Editor instance for RDF operations 

466 entity_uri: URI of the parent entity 

467 predicate: Predicate URI 

468 value: Value to process (dict or primitive) 

469 default_graph_uri: Default graph URI for quad stores 

470 matching_field_def: Field definition for datatype validation 

471  

472 Returns: 

473 URIRef: The URI of the created/referenced entity 

474 """ 

475 if isinstance(value, dict) and "entity_type" in value: 

476 nested_uri = generate_unique_uri(value["entity_type"]) 

477 editor.create( 

478 entity_uri, 

479 URIRef(predicate), 

480 nested_uri, 

481 default_graph_uri, 

482 ) 

483 create_nested_entity( 

484 editor, 

485 nested_uri, 

486 value, 

487 default_graph_uri 

488 ) 

489 return nested_uri 

490 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

491 entity_ref_uri = value.get("entity_uri") 

492 if entity_ref_uri: 

493 object_value = URIRef(entity_ref_uri) 

494 editor.create( 

495 entity_uri, 

496 URIRef(predicate), 

497 object_value, 

498 default_graph_uri, 

499 ) 

500 return object_value 

501 else: 

502 raise ValueError("Missing entity_uri in existing entity reference") 

503 else: 

504 # Handle simple properties - check if it's a URI or literal 

505 if validators.url(str(value)): 

506 object_value = URIRef(value) 

507 else: 

508 datatype_uris = [] 

509 if matching_field_def: 

510 datatype_uris = matching_field_def.get("datatypes", []) 

511 datatype = determine_datatype(value, datatype_uris) 

512 object_value = Literal(value, datatype=datatype) 

513 editor.create( 

514 entity_uri, 

515 URIRef(predicate), 

516 object_value, 

517 default_graph_uri, 

518 ) 

519 return object_value 

520 

521 

522def process_ordered_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri): 

523 """ 

524 Process a single entity value for ordered properties. 

525  

526 Args: 

527 editor: Editor instance for RDF operations 

528 entity_uri: URI of the parent entity 

529 predicate: Predicate URI 

530 value: Value to process (dict) 

531 default_graph_uri: Default graph URI for quad stores 

532  

533 Returns: 

534 URIRef: The URI of the created/referenced entity 

535 """ 

536 if isinstance(value, dict) and "entity_type" in value: 

537 nested_uri = generate_unique_uri(value["entity_type"]) 

538 editor.create( 

539 entity_uri, 

540 URIRef(predicate), 

541 nested_uri, 

542 default_graph_uri, 

543 ) 

544 create_nested_entity( 

545 editor, 

546 nested_uri, 

547 value, 

548 default_graph_uri 

549 ) 

550 return nested_uri 

551 elif isinstance(value, dict) and value.get("is_existing_entity", False): 

552 # If it's a direct URI value (reference to existing entity) 

553 nested_uri = URIRef(value) 

554 editor.create( 

555 entity_uri, 

556 URIRef(predicate), 

557 nested_uri, 

558 default_graph_uri, 

559 ) 

560 return nested_uri 

561 else: 

562 raise ValueError("Unexpected value type for ordered property") 

563 

564 

565def process_ordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, ordered_by): 

566 """ 

567 Process ordered properties by grouping values by shape and maintaining order. 

568  

569 Args: 

570 editor: Editor instance for RDF operations 

571 entity_uri: URI of the parent entity 

572 predicate: Predicate URI 

573 values: List of values to process 

574 default_graph_uri: Default graph URI for quad stores 

575 ordered_by: URI of the ordering property 

576 """ 

577 values_by_shape = {} 

578 for value in values: 

579 shape = value.get("entity_shape") 

580 if not shape: 

581 shape = "default_shape" 

582 if shape not in values_by_shape: 

583 values_by_shape[shape] = [] 

584 values_by_shape[shape].append(value) 

585 

586 for shape, shape_values in values_by_shape.items(): 

587 previous_entity = None 

588 for value in shape_values: 

589 nested_uri = process_ordered_entity_value( 

590 editor, entity_uri, predicate, value, default_graph_uri 

591 ) 

592 

593 if previous_entity: 

594 editor.create( 

595 previous_entity, 

596 URIRef(ordered_by), 

597 nested_uri, 

598 default_graph_uri, 

599 ) 

600 previous_entity = nested_uri 

601 

602 

603def process_unordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, matching_field_def): 

604 """ 

605 Process unordered properties. 

606  

607 Args: 

608 editor: Editor instance for RDF operations 

609 entity_uri: URI of the parent entity 

610 predicate: Predicate URI 

611 values: List of values to process 

612 default_graph_uri: Default graph URI for quad stores 

613 matching_field_def: Field definition for datatype validation 

614 """ 

615 for value in values: 

616 process_entity_value( 

617 editor, entity_uri, predicate, value, default_graph_uri, matching_field_def 

618 ) 

619 

620 

621def determine_datatype(value, datatype_uris): 

622 for datatype_uri in datatype_uris: 

623 validation_func = next( 

624 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None 

625 ) 

626 if validation_func and validation_func(value): 

627 return URIRef(datatype_uri) 

628 # If none match, default to XSD.string 

629 return XSD.string 

630 

631 

632def validate_entity_data(structured_data): 

633 """ 

634 Validates entity data against form field definitions, considering shape matching. 

635 

636 Args: 

637 structured_data (dict): Data to validate containing entity_type and properties 

638 

639 Returns: 

640 list: List of validation error messages, empty if validation passes 

641 """ 

642 custom_filter = get_custom_filter() 

643 form_fields = get_form_fields() 

644 

645 errors = [] 

646 entity_type = structured_data.get("entity_type") 

647 entity_shape = structured_data.get("entity_shape") 

648 

649 if not entity_type: 

650 errors.append(gettext("Entity type is required")) 

651 return errors 

652 

653 entity_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

654 

655 if not entity_key: 

656 errors.append(f"No form fields found for entity type: {entity_type}" + 

657 (f" and shape: {entity_shape}" if entity_shape else "")) 

658 return errors 

659 

660 entity_fields = form_fields[entity_key] 

661 properties = structured_data.get("properties", {}) 

662 

663 for prop_uri, prop_values in properties.items(): 

664 if URIRef(prop_uri) == RDF.type: 

665 continue 

666 

667 field_definitions = entity_fields.get(prop_uri) 

668 if not field_definitions: 

669 errors.append( 

670 gettext( 

671 "Unknown property %(prop_uri)s for entity type %(entity_type)s", 

672 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

673 entity_type=custom_filter.human_readable_class(entity_key), 

674 ) 

675 ) 

676 continue 

677 

678 if not isinstance(prop_values, list): 

679 prop_values = [prop_values] 

680 

681 property_shape = None 

682 if prop_values and isinstance(prop_values[0], dict): 

683 property_shape = prop_values[0].get("shape") 

684 

685 matching_field_def = None 

686 for field_def in field_definitions: 

687 if property_shape: 

688 if field_def.get("subjectShape") == property_shape: 

689 matching_field_def = field_def 

690 break 

691 else: 

692 if not field_def.get("subjectShape"): 

693 matching_field_def = field_def 

694 break 

695 

696 if not matching_field_def and field_definitions: 

697 matching_field_def = field_definitions[0] 

698 

699 if matching_field_def: 

700 min_count = matching_field_def.get("min", 0) 

701 max_count = matching_field_def.get("max", None) 

702 value_count = len(prop_values) 

703 

704 if value_count < min_count: 

705 value = gettext("values") if min_count > 1 else gettext("value") 

706 errors.append( 

707 gettext( 

708 "Property %(prop_uri)s requires at least %(min_count)d %(value)s", 

709 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

710 min_count=min_count, 

711 value=value, 

712 ) 

713 ) 

714 if max_count is not None and value_count > max_count: 

715 value = gettext("values") if max_count > 1 else gettext("value") 

716 errors.append( 

717 gettext( 

718 "Property %(prop_uri)s allows at most %(max_count)d %(value)s", 

719 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

720 max_count=max_count, 

721 value=value, 

722 ) 

723 ) 

724 

725 mandatory_values = matching_field_def.get("mandatory_values", []) 

726 for mandatory_value in mandatory_values: 

727 if mandatory_value not in prop_values: 

728 errors.append( 

729 gettext( 

730 "Property %(prop_uri)s requires the value %(mandatory_value)s", 

731 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

732 mandatory_value=mandatory_value, 

733 ) 

734 ) 

735 

736 for value in prop_values: 

737 if isinstance(value, dict) and "entity_type" in value: 

738 nested_errors = validate_entity_data(value) 

739 errors.extend(nested_errors) 

740 else: 

741 datatypes = matching_field_def.get("datatypes", []) 

742 if datatypes: 

743 is_valid_datatype = False 

744 for dtype in datatypes: 

745 validation_func = next( 

746 ( 

747 d[1] 

748 for d in DATATYPE_MAPPING 

749 if d[0] == URIRef(dtype) 

750 ), 

751 None, 

752 ) 

753 if validation_func and validation_func(value): 

754 is_valid_datatype = True 

755 break 

756 if not is_valid_datatype: 

757 expected_types = ", ".join( 

758 [ 

759 custom_filter.human_readable_predicate(dtype, entity_key) 

760 for dtype in datatypes 

761 ] 

762 ) 

763 errors.append( 

764 gettext( 

765 'Value "%(value)s" for property %(prop_uri)s is not of expected type %(expected_types)s', 

766 value=value, 

767 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

768 expected_types=expected_types 

769 ) 

770 ) 

771 

772 optional_values = matching_field_def.get("optionalValues", []) 

773 if optional_values and value not in optional_values: 

774 acceptable_values = ", ".join( 

775 [ 

776 custom_filter.human_readable_predicate(val, entity_key) 

777 for val in optional_values 

778 ] 

779 ) 

780 errors.append( 

781 gettext( 

782 'Value "%(value)s" is not permitted for property %(prop_uri)s. Acceptable values are: %(acceptable_values)s', 

783 value=value, 

784 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

785 acceptable_values=acceptable_values 

786 ) 

787 ) 

788 

789 # In the RDF model, a property with zero values is equivalent to the property being absent, 

790 # as a triple requires a subject, predicate, and object. Therefore, this section checks for 

791 # properties defined in the schema that are completely absent from the input data but are 

792 # required (min_count > 0). This complements the cardinality check above, which only 

793 # validates properties that are present in the data. 

794 # Check for missing required properties 

795 for prop_uri, field_definitions in entity_fields.items(): 

796 if prop_uri not in properties: 

797 for field_def in field_definitions: 

798 min_count = field_def.get("min", 0) 

799 if min_count > 0: 

800 value = gettext("values") if min_count > 1 else gettext("value") 

801 errors.append( 

802 gettext( 

803 "Missing required property: %(prop_uri)s requires at least %(min_count)d %(value)s", 

804 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key), 

805 min_count=min_count, 

806 value=value, 

807 ) 

808 ) 

809 break # Only need to report once per property 

810 

811 return errors 

812 

813 

814@entity_bp.route("/entity-history/<path:entity_uri>") 

815@login_required 

816def entity_history(entity_uri): 

817 """ 

818 Display the history of changes for an entity. 

819 

820 Args: 

821 entity_uri: URI of the entity 

822 """ 

823 custom_filter = get_custom_filter() 

824 change_tracking_config = get_change_tracking_config() 

825 

826 agnostic_entity = AgnosticEntity( 

827 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True 

828 ) 

829 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

830 

831 sorted_metadata = sorted( 

832 provenance[entity_uri].items(), 

833 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]), 

834 ) 

835 sorted_timestamps = [ 

836 convert_to_datetime(meta["generatedAtTime"], stringify=True) 

837 for _, meta in sorted_metadata 

838 ] 

839 

840 # Get correct context for entity label 

841 latest_metadata = sorted_metadata[-1][1] if sorted_metadata else None 

842 is_latest_deletion = ( 

843 latest_metadata 

844 and "invalidatedAtTime" in latest_metadata 

845 and latest_metadata["invalidatedAtTime"] 

846 ) 

847 if is_latest_deletion and len(sorted_timestamps) > 1: 

848 context_snapshot = history[entity_uri][sorted_timestamps[-2]] 

849 else: 

850 context_snapshot = history[entity_uri][sorted_timestamps[-1]] 

851 

852 entity_classes = [str(triple[2]) for triple in context_snapshot.triples((URIRef(entity_uri), RDF.type, None))] 

853 highest_priority_class = get_highest_priority_class(entity_classes) 

854 

855 snapshot_entity_shape = determine_shape_for_entity_triples( 

856 list(context_snapshot.triples((URIRef(entity_uri), None, None))) 

857 ) 

858 

859 # Generate timeline events 

860 events = [] 

861 for i, (snapshot_uri, metadata) in enumerate(sorted_metadata): 

862 date = convert_to_datetime(metadata["generatedAtTime"]) 

863 snapshot_timestamp_str = convert_to_datetime( 

864 metadata["generatedAtTime"], stringify=True 

865 ) 

866 snapshot_graph = history[entity_uri][snapshot_timestamp_str] 

867 

868 responsible_agent = custom_filter.format_agent_reference( 

869 metadata["wasAttributedTo"] 

870 ) 

871 primary_source = custom_filter.format_source_reference( 

872 metadata["hadPrimarySource"] 

873 ) 

874 

875 description = _format_snapshot_description( 

876 metadata, 

877 entity_uri, 

878 highest_priority_class, 

879 context_snapshot, 

880 history, 

881 sorted_timestamps, 

882 i, 

883 custom_filter, 

884 ) 

885 modifications = metadata.get("hasUpdateQuery", "") 

886 modification_text = "" 

887 if modifications: 

888 parsed_modifications = parse_sparql_update(modifications) 

889 modification_text = generate_modification_text( 

890 parsed_modifications, 

891 highest_priority_class, 

892 snapshot_entity_shape, 

893 history=history, 

894 entity_uri=entity_uri, 

895 current_snapshot=snapshot_graph, 

896 current_snapshot_timestamp=snapshot_timestamp_str, 

897 custom_filter=custom_filter, 

898 ) 

899 

900 # Check if this version can be restored (not the latest version and there are multiple versions) 

901 can_restore = len(sorted_metadata) > 1 and i + 1 < len(sorted_metadata) 

902 restore_button = "" 

903 if can_restore: 

904 restore_button = f""" 

905 <form action='/restore-version/{entity_uri}/{metadata["generatedAtTime"]}' method='post' class='d-inline restore-form'> 

906 <button type='submit' class='btn btn-success restore-btn'> 

907 <i class='bi bi-arrow-counterclockwise me-1'></i>{gettext('Restore')} 

908 </button> 

909 </form> 

910 """ 

911 

912 event = { 

913 "start_date": { 

914 "year": date.year, 

915 "month": date.month, 

916 "day": date.day, 

917 "hour": date.hour, 

918 "minute": date.minute, 

919 "second": date.second, 

920 }, 

921 "text": { 

922 "headline": gettext("Snapshot") + " " + str(i + 1), 

923 "text": f""" 

924 <p><strong>{gettext('Responsible agent')}:</strong> {responsible_agent}</p> 

925 <p><strong>{gettext('Primary source')}:</strong> {primary_source}</p> 

926 <p><strong>{gettext('Description')}:</strong> {description}</p> 

927 <div class="modifications mb-3"> 

928 {modification_text} 

929 </div> 

930 <div class="d-flex gap-2 mt-2"> 

931 <a href='/entity-version/{entity_uri}/{metadata["generatedAtTime"]}' class='btn btn-outline-primary view-version' target='_self'>{gettext('View version')}</a> 

932 {restore_button} 

933 </div> 

934 """, 

935 }, 

936 "autolink": False, 

937 } 

938 

939 if i + 1 < len(sorted_metadata): 

940 next_date = convert_to_datetime( 

941 sorted_metadata[i + 1][1]["generatedAtTime"] 

942 ) 

943 event["end_date"] = { 

944 "year": next_date.year, 

945 "month": next_date.month, 

946 "day": next_date.day, 

947 "hour": next_date.hour, 

948 "minute": next_date.minute, 

949 "second": next_date.second, 

950 } 

951 

952 events.append(event) 

953 

954 entity_label = custom_filter.human_readable_entity( 

955 entity_uri, (highest_priority_class, snapshot_entity_shape), context_snapshot 

956 ) 

957 

958 timeline_data = { 

959 "entityUri": entity_uri, 

960 "entityLabel": entity_label, 

961 "entityClasses": list(entity_classes), 

962 "entityShape": snapshot_entity_shape, 

963 "events": events, 

964 } 

965 

966 return render_template("entity/history.jinja", timeline_data=timeline_data) 

967 

968 

969def _format_snapshot_description( 

970 metadata: dict, 

971 entity_uri: str, 

972 highest_priority_class: str, 

973 context_snapshot: Graph, 

974 history: dict, 

975 sorted_timestamps: list[str], 

976 current_index: int, 

977 custom_filter: Filter, 

978) -> Tuple[str, bool]: 

979 """ 

980 Formats the snapshot description and determines if it's a merge snapshot. 

981 

982 Args: 

983 metadata: The snapshot metadata dictionary. 

984 entity_uri: The URI of the main entity. 

985 highest_priority_class: The highest priority class for the entity. 

986 context_snapshot: The graph snapshot for context. 

987 history: The history dictionary containing snapshots. 

988 sorted_timestamps: Sorted list of snapshot timestamps. 

989 current_index: The index of the current snapshot in sorted_timestamps. 

990 custom_filter: The custom filter instance for formatting. 

991 

992 Returns: 

993 The formatted description string. 

994 """ 

995 description = metadata.get("description", "") 

996 is_merge_snapshot = False 

997 was_derived_from = metadata.get('wasDerivedFrom') 

998 if isinstance(was_derived_from, list) and len(was_derived_from) > 1: 

999 is_merge_snapshot = True 

1000 

1001 if is_merge_snapshot: 

1002 # Regex to find URI after "merged with", potentially enclosed in single quotes or none 

1003 match = re.search(r"merged with ['‘]?([^'’<>\s]+)['’]?", description) 

1004 if match: 

1005 potential_merged_uri = match.group(1) 

1006 if validators.url(potential_merged_uri): 

1007 merged_entity_uri_from_desc = potential_merged_uri 

1008 merged_entity_label = None 

1009 if current_index > 0: 

1010 previous_snapshot_timestamp = sorted_timestamps[current_index - 1] 

1011 previous_snapshot_graph = history.get(entity_uri, {}).get(previous_snapshot_timestamp) 

1012 if previous_snapshot_graph: 

1013 raw_merged_entity_classes = [ 

1014 str(o) 

1015 for s, p, o in previous_snapshot_graph.triples( 

1016 (URIRef(merged_entity_uri_from_desc), RDF.type, None) 

1017 ) 

1018 ] 

1019 highest_priority_merged_class = get_highest_priority_class( 

1020 raw_merged_entity_classes 

1021 ) if raw_merged_entity_classes else None 

1022 

1023 shape = determine_shape_for_classes(raw_merged_entity_classes) 

1024 merged_entity_label = custom_filter.human_readable_entity( 

1025 merged_entity_uri_from_desc, 

1026 (highest_priority_merged_class, shape), 

1027 previous_snapshot_graph, 

1028 ) 

1029 if ( 

1030 merged_entity_label 

1031 and merged_entity_label != merged_entity_uri_from_desc 

1032 ): 

1033 description = description.replace( 

1034 match.group(0), f"merged with '{merged_entity_label}'" 

1035 ) 

1036 

1037 shape = determine_shape_for_classes([highest_priority_class]) 

1038 entity_label_for_desc = custom_filter.human_readable_entity( 

1039 entity_uri, (highest_priority_class, shape), context_snapshot 

1040 ) 

1041 if entity_label_for_desc and entity_label_for_desc != entity_uri: 

1042 description = description.replace(f"'{entity_uri}'", f"'{entity_label_for_desc}'") 

1043 

1044 return description 

1045 

1046 

1047@entity_bp.route("/entity-version/<path:entity_uri>/<timestamp>") 

1048@login_required 

1049def entity_version(entity_uri, timestamp): 

1050 """ 

1051 Display a specific version of an entity. 

1052 

1053 Args: 

1054 entity_uri: URI of the entity 

1055 timestamp: Timestamp of the version to display 

1056 """ 

1057 custom_filter = get_custom_filter() 

1058 change_tracking_config = get_change_tracking_config() 

1059 

1060 try: 

1061 timestamp_dt = datetime.fromisoformat(timestamp) 

1062 except ValueError: 

1063 provenance_sparql = get_provenance_sparql() 

1064 query_timestamp = f""" 

1065 SELECT ?generation_time 

1066 WHERE {{ 

1067 <{entity_uri}/prov/se/{timestamp}> <http://www.w3.org/ns/prov#generatedAtTime> ?generation_time. 

1068 }} 

1069 """ 

1070 provenance_sparql.setQuery(query_timestamp) 

1071 provenance_sparql.setReturnFormat(JSON) 

1072 try: 

1073 generation_time = provenance_sparql.queryAndConvert()["results"][ 

1074 "bindings" 

1075 ][0]["generation_time"]["value"] 

1076 except IndexError: 

1077 abort(404) 

1078 timestamp = generation_time 

1079 timestamp_dt = datetime.fromisoformat(generation_time) 

1080 

1081 agnostic_entity = AgnosticEntity( 

1082 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True 

1083 ) 

1084 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

1085 main_entity_history = history.get(entity_uri, {}) 

1086 sorted_timestamps = sorted( 

1087 main_entity_history.keys(), key=lambda t: convert_to_datetime(t) 

1088 ) 

1089 

1090 if not sorted_timestamps: 

1091 abort(404) 

1092 

1093 closest_timestamp = min( 

1094 sorted_timestamps, 

1095 key=lambda t: abs( 

1096 convert_to_datetime(t).astimezone() - timestamp_dt.astimezone() 

1097 ), 

1098 ) 

1099 

1100 version = main_entity_history[closest_timestamp] 

1101 triples = list(version.triples((URIRef(entity_uri), None, None))) 

1102 

1103 entity_metadata = provenance.get(entity_uri, {}) 

1104 closest_metadata = None 

1105 min_time_diff = None 

1106 

1107 latest_timestamp = max(sorted_timestamps) 

1108 latest_metadata = None 

1109 

1110 for se_uri, meta in entity_metadata.items(): 

1111 meta_time = convert_to_datetime(meta["generatedAtTime"]) 

1112 time_diff = abs((meta_time - timestamp_dt).total_seconds()) 

1113 

1114 if closest_metadata is None or time_diff < min_time_diff: 

1115 closest_metadata = meta 

1116 min_time_diff = time_diff 

1117 

1118 if meta["generatedAtTime"] == latest_timestamp: 

1119 latest_metadata = meta 

1120 

1121 if closest_metadata is None or latest_metadata is None: 

1122 abort(404) 

1123 

1124 is_deletion_snapshot = ( 

1125 closest_timestamp == latest_timestamp 

1126 and "invalidatedAtTime" in latest_metadata 

1127 and latest_metadata["invalidatedAtTime"] 

1128 ) or len(triples) == 0 

1129 

1130 context_version = version 

1131 if is_deletion_snapshot and len(sorted_timestamps) > 1: 

1132 current_index = sorted_timestamps.index(closest_timestamp) 

1133 if current_index > 0: 

1134 context_version = main_entity_history[sorted_timestamps[current_index - 1]] 

1135 

1136 if is_deletion_snapshot and len(sorted_timestamps) > 1: 

1137 subject_classes = [ 

1138 o 

1139 for _, _, o in context_version.triples((URIRef(entity_uri), RDF.type, None)) 

1140 ] 

1141 else: 

1142 subject_classes = [ 

1143 o for _, _, o in version.triples((URIRef(entity_uri), RDF.type, None)) 

1144 ] 

1145 

1146 highest_priority_class = get_highest_priority_class(subject_classes) 

1147 

1148 entity_shape = determine_shape_for_entity_triples( 

1149 list(context_version.triples((URIRef(entity_uri), None, None))) 

1150 ) 

1151 

1152 _, _, _, _, _, valid_predicates = get_valid_predicates(triples, highest_priority_class=highest_priority_class) 

1153 

1154 grouped_triples, relevant_properties = get_grouped_triples( 

1155 entity_uri, 

1156 triples, 

1157 valid_predicates, 

1158 historical_snapshot=context_version, 

1159 highest_priority_class=highest_priority_class, 

1160 highest_priority_shape=entity_shape 

1161 ) 

1162 

1163 snapshot_times = [ 

1164 convert_to_datetime(meta["generatedAtTime"]) 

1165 for meta in entity_metadata.values() 

1166 ] 

1167 snapshot_times = sorted(set(snapshot_times)) 

1168 version_number = snapshot_times.index(timestamp_dt) + 1 

1169 

1170 next_snapshot_timestamp = None 

1171 prev_snapshot_timestamp = None 

1172 

1173 for snap_time in snapshot_times: 

1174 if snap_time > timestamp_dt: 

1175 next_snapshot_timestamp = snap_time.isoformat() 

1176 break 

1177 

1178 for snap_time in reversed(snapshot_times): 

1179 if snap_time < timestamp_dt: 

1180 prev_snapshot_timestamp = snap_time.isoformat() 

1181 break 

1182 

1183 modifications = "" 

1184 if closest_metadata.get("hasUpdateQuery"): 

1185 sparql_query = closest_metadata["hasUpdateQuery"] 

1186 parsed_modifications = parse_sparql_update(sparql_query) 

1187 modifications = generate_modification_text( 

1188 parsed_modifications, 

1189 highest_priority_class, 

1190 entity_shape, 

1191 history, 

1192 entity_uri, 

1193 context_version, 

1194 closest_timestamp, 

1195 custom_filter, 

1196 ) 

1197 

1198 try: 

1199 current_index = sorted_timestamps.index(closest_timestamp) 

1200 except ValueError: 

1201 current_index = -1 

1202 

1203 if closest_metadata.get("description"): 

1204 formatted_description = _format_snapshot_description( 

1205 closest_metadata, 

1206 entity_uri, 

1207 highest_priority_class, 

1208 context_version, 

1209 history, 

1210 sorted_timestamps, 

1211 current_index, 

1212 custom_filter, 

1213 ) 

1214 closest_metadata["description"] = formatted_description 

1215 

1216 closest_timestamp = closest_metadata["generatedAtTime"] 

1217 

1218 return render_template( 

1219 "entity/version.jinja", 

1220 subject=entity_uri, 

1221 entity_type=highest_priority_class, 

1222 entity_shape=entity_shape, 

1223 metadata={closest_timestamp: closest_metadata}, 

1224 timestamp=closest_timestamp, 

1225 next_snapshot_timestamp=next_snapshot_timestamp, 

1226 prev_snapshot_timestamp=prev_snapshot_timestamp, 

1227 modifications=modifications, 

1228 grouped_triples=grouped_triples, 

1229 version_number=version_number, 

1230 version=context_version, 

1231 ) 

1232 

1233 

1234@entity_bp.route("/restore-version/<path:entity_uri>/<timestamp>", methods=["POST"]) 

1235@login_required 

1236def restore_version(entity_uri, timestamp): 

1237 """ 

1238 Restore an entity to a previous version. 

1239 

1240 Args: 

1241 entity_uri: URI of the entity to restore 

1242 timestamp: Timestamp of the version to restore to 

1243 """ 

1244 timestamp = convert_to_datetime(timestamp, stringify=True) 

1245 change_tracking_config = get_change_tracking_config() 

1246 

1247 # Get entity history 

1248 agnostic_entity = AgnosticEntity( 

1249 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True 

1250 ) 

1251 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

1252 

1253 historical_graph = history.get(entity_uri, {}).get(timestamp) 

1254 if historical_graph is None: 

1255 abort(404) 

1256 

1257 current_graph = fetch_current_state_with_related_entities(provenance) 

1258 

1259 is_deleted = len(list(current_graph.triples((URIRef(entity_uri), None, None)))) == 0 

1260 

1261 triples_or_quads_to_delete, triples_or_quads_to_add = compute_graph_differences( 

1262 current_graph, historical_graph 

1263 ) 

1264 

1265 # Get all entities that need restoration 

1266 entities_to_restore = get_entities_to_restore( 

1267 triples_or_quads_to_delete, triples_or_quads_to_add, entity_uri 

1268 ) 

1269 

1270 # Prepare snapshot information for all entities 

1271 entity_snapshots = prepare_entity_snapshots( 

1272 entities_to_restore, provenance, timestamp 

1273 ) 

1274 

1275 # Create editor instance 

1276 editor = Editor( 

1277 get_dataset_endpoint(), 

1278 get_provenance_endpoint(), 

1279 current_app.config["COUNTER_HANDLER"], 

1280 URIRef(get_responsible_agent_uri(current_user.orcid)), 

1281 None if is_deleted else entity_snapshots[entity_uri]["source"], 

1282 current_app.config["DATASET_GENERATION_TIME"], 

1283 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

1284 ) 

1285 

1286 # Import current state into editor 

1287 if get_dataset_is_quadstore(): 

1288 for quad in current_graph.quads(): 

1289 editor.g_set.add(quad) 

1290 else: 

1291 for triple in current_graph: 

1292 editor.g_set.add(triple) 

1293 

1294 editor.preexisting_finished() 

1295 

1296 # Apply deletions 

1297 for item in triples_or_quads_to_delete: 

1298 if len(item) == 4: 

1299 editor.delete(item[0], item[1], item[2], item[3]) 

1300 else: 

1301 editor.delete(item[0], item[1], item[2]) 

1302 

1303 subject = str(item[0]) 

1304 if subject in entity_snapshots: 

1305 entity_info = entity_snapshots[subject] 

1306 if entity_info["needs_restore"]: 

1307 editor.g_set.mark_as_restored(URIRef(subject)) 

1308 editor.g_set.entity_index[URIRef(subject)]["restoration_source"] = ( 

1309 entity_info["source"] 

1310 ) 

1311 

1312 # Apply additions 

1313 for item in triples_or_quads_to_add: 

1314 if len(item) == 4: 

1315 editor.create(item[0], item[1], item[2], item[3]) 

1316 else: 

1317 editor.create(item[0], item[1], item[2]) 

1318 

1319 subject = str(item[0]) 

1320 if subject in entity_snapshots: 

1321 entity_info = entity_snapshots[subject] 

1322 if entity_info["needs_restore"]: 

1323 editor.g_set.mark_as_restored(URIRef(subject)) 

1324 editor.g_set.entity_index[URIRef(subject)]["source"] = entity_info[ 

1325 "source" 

1326 ] 

1327 

1328 # Handle main entity restoration if needed 

1329 if is_deleted and entity_uri in entity_snapshots: 

1330 editor.g_set.mark_as_restored(URIRef(entity_uri)) 

1331 source = entity_snapshots[entity_uri]["source"] 

1332 editor.g_set.entity_index[URIRef(entity_uri)]["source"] = source 

1333 

1334 try: 

1335 editor.save() 

1336 flash(gettext("Version restored successfully"), "success") 

1337 except Exception as e: 

1338 flash( 

1339 gettext( 

1340 "An error occurred while restoring the version: %(error)s", error=str(e) 

1341 ), 

1342 "error", 

1343 ) 

1344 

1345 return redirect(url_for("entity.about", subject=entity_uri)) 

1346 

1347 

1348def compute_graph_differences( 

1349 current_graph: Graph | ConjunctiveGraph, historical_graph: Graph | ConjunctiveGraph 

1350): 

1351 if get_dataset_is_quadstore(): 

1352 current_data = set(current_graph.quads()) 

1353 historical_data = set(historical_graph.quads()) 

1354 else: 

1355 current_data = set(current_graph.triples((None, None, None))) 

1356 historical_data = set(historical_graph.triples((None, None, None))) 

1357 triples_or_quads_to_delete = current_data - historical_data 

1358 triples_or_quads_to_add = historical_data - current_data 

1359 

1360 return triples_or_quads_to_delete, triples_or_quads_to_add 

1361 

1362 

1363def get_entities_to_restore( 

1364 triples_or_quads_to_delete: set, triples_or_quads_to_add: set, main_entity_uri: str 

1365) -> set: 

1366 """ 

1367 Identify all entities that need to be restored based on the graph differences. 

1368 

1369 Args: 

1370 triples_or_quads_to_delete: Set of triples/quads to be deleted 

1371 triples_or_quads_to_add: Set of triples/quads to be added 

1372 main_entity_uri: URI of the main entity being restored 

1373 

1374 Returns: 

1375 Set of entity URIs that need to be restored 

1376 """ 

1377 entities_to_restore = {main_entity_uri} 

1378 

1379 for item in list(triples_or_quads_to_delete) + list(triples_or_quads_to_add): 

1380 predicate = str(item[1]) 

1381 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": 

1382 continue 

1383 

1384 subject = str(item[0]) 

1385 obj = str(item[2]) 

1386 for uri in [subject, obj]: 

1387 if uri != main_entity_uri and validators.url(uri): 

1388 entities_to_restore.add(uri) 

1389 

1390 return entities_to_restore 

1391 

1392 

1393def prepare_entity_snapshots( 

1394 entities_to_restore: set, provenance: dict, target_time: str 

1395) -> dict: 

1396 """ 

1397 Prepare snapshot information for all entities that need to be restored. 

1398 

1399 Args: 

1400 entities_to_restore: Set of entity URIs to process 

1401 provenance: Dictionary containing provenance data for all entities 

1402 target_time: Target restoration time 

1403 

1404 Returns: 

1405 Dictionary mapping entity URIs to their restoration information 

1406 """ 

1407 entity_snapshots = {} 

1408 

1409 for entity_uri in entities_to_restore: 

1410 if entity_uri not in provenance: 

1411 continue 

1412 

1413 # Find the appropriate source snapshot 

1414 source_snapshot = find_appropriate_snapshot(provenance[entity_uri], target_time) 

1415 if not source_snapshot: 

1416 continue 

1417 

1418 # Check if entity is currently deleted by examining its latest snapshot 

1419 sorted_snapshots = sorted( 

1420 provenance[entity_uri].items(), 

1421 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]), 

1422 ) 

1423 latest_snapshot = sorted_snapshots[-1][1] 

1424 is_deleted = ( 

1425 latest_snapshot.get("invalidatedAtTime") 

1426 and latest_snapshot["generatedAtTime"] 

1427 == latest_snapshot["invalidatedAtTime"] 

1428 ) 

1429 

1430 entity_snapshots[entity_uri] = { 

1431 "source": source_snapshot, 

1432 "needs_restore": is_deleted, 

1433 } 

1434 

1435 return entity_snapshots 

1436 

1437 

1438def find_appropriate_snapshot(provenance_data: dict, target_time: str) -> Optional[str]: 

1439 """ 

1440 Find the most appropriate snapshot to use as a source for restoration. 

1441 

1442 Args: 

1443 provenance_data: Dictionary of snapshots and their metadata for an entity 

1444 target_time: The target restoration time as ISO format string 

1445 

1446 Returns: 

1447 The URI of the most appropriate snapshot, or None if no suitable snapshot is found 

1448 """ 

1449 target_datetime = convert_to_datetime(target_time) 

1450 

1451 # Convert all generation times to datetime for comparison 

1452 valid_snapshots = [] 

1453 for snapshot_uri, metadata in provenance_data.items(): 

1454 generation_time = convert_to_datetime(metadata["generatedAtTime"]) 

1455 

1456 # Skip deletion snapshots (where generation time equals invalidation time) 

1457 if ( 

1458 metadata.get("invalidatedAtTime") 

1459 and metadata["generatedAtTime"] == metadata["invalidatedAtTime"] 

1460 ): 

1461 continue 

1462 

1463 # Only consider snapshots up to our target time 

1464 if generation_time <= target_datetime: 

1465 valid_snapshots.append((generation_time, snapshot_uri)) 

1466 

1467 if not valid_snapshots: 

1468 return None 

1469 

1470 # Sort by generation time and take the most recent one 

1471 valid_snapshots.sort(key=lambda x: x[0]) 

1472 return valid_snapshots[-1][1] 

1473 

1474 

1475def determine_object_class_and_shape(object_value: str, relevant_snapshot: Graph) -> tuple[Optional[str], Optional[str]]: 

1476 """ 

1477 Determine the class and shape for an object value from a graph snapshot. 

1478  

1479 Args: 

1480 object_value: The object value (URI or literal) 

1481 relevant_snapshot: Graph snapshot to query for object information 

1482  

1483 Returns: 

1484 Tuple of (object_class, object_shape_uri) or (None, None) if not determinable 

1485 """ 

1486 if not validators.url(str(object_value)) or not relevant_snapshot: 

1487 return None, None 

1488 

1489 object_triples = list(relevant_snapshot.triples((URIRef(object_value), None, None))) 

1490 if not object_triples: 

1491 return None, None 

1492 

1493 object_shape_uri = determine_shape_for_entity_triples(object_triples) 

1494 object_classes = [ 

1495 str(o) 

1496 for _, _, o in relevant_snapshot.triples( 

1497 (URIRef(object_value), RDF.type, None) 

1498 ) 

1499 ] 

1500 object_class = get_highest_priority_class(object_classes) if object_classes else None 

1501 

1502 return object_class, object_shape_uri 

1503 

1504 

1505def generate_modification_text( 

1506 modifications, 

1507 highest_priority_class, 

1508 entity_shape, 

1509 history, 

1510 entity_uri, 

1511 current_snapshot, 

1512 current_snapshot_timestamp, 

1513 custom_filter: Filter, 

1514) -> str: 

1515 """ 

1516 Generate HTML text describing modifications to an entity, using display rules for property ordering. 

1517 

1518 Args: 

1519 modifications (dict): Dictionary of modifications from parse_sparql_update 

1520 highest_priority_class (str): The highest priority class for the subject entity 

1521 entity_shape (str): The shape for the subject entity 

1522 history (dict): Historical snapshots dictionary 

1523 entity_uri (str): URI of the entity being modified 

1524 current_snapshot (Graph): Current entity snapshot 

1525 current_snapshot_timestamp (str): Timestamp of current snapshot 

1526 custom_filter (Filter): Filter instance for formatting 

1527 

1528 Returns: 

1529 str: HTML text describing the modifications 

1530 """ 

1531 modification_text = "<p><strong>" + gettext("Modifications") + "</strong></p>" 

1532 

1533 ordered_properties = get_property_order_from_rules(highest_priority_class, entity_shape) 

1534 

1535 for mod_type, triples in modifications.items(): 

1536 modification_text += "<ul class='list-group mb-3'><p>" 

1537 if mod_type == gettext("Additions"): 

1538 modification_text += '<i class="bi bi-plus-circle-fill text-success"></i>' 

1539 elif mod_type == gettext("Deletions"): 

1540 modification_text += '<i class="bi bi-dash-circle-fill text-danger"></i>' 

1541 modification_text += " <em>" + gettext(mod_type) + "</em></p>" 

1542 

1543 object_shapes_cache = {} 

1544 object_classes_cache = {} 

1545 

1546 relevant_snapshot = None 

1547 if ( 

1548 mod_type == gettext("Deletions") 

1549 and history 

1550 and entity_uri 

1551 and current_snapshot_timestamp 

1552 ): 

1553 sorted_timestamps = sorted(history[entity_uri].keys()) 

1554 current_index = sorted_timestamps.index(current_snapshot_timestamp) 

1555 if current_index > 0: 

1556 relevant_snapshot = history[entity_uri][ 

1557 sorted_timestamps[current_index - 1] 

1558 ] 

1559 else: 

1560 relevant_snapshot = current_snapshot 

1561 

1562 if relevant_snapshot: 

1563 for triple in triples: 

1564 object_value = triple[2] 

1565 object_class, object_shape = determine_object_class_and_shape(object_value, relevant_snapshot) 

1566 object_classes_cache[str(object_value)] = object_class 

1567 object_shapes_cache[str(object_value)] = object_shape 

1568 

1569 predicate_shape_groups = {} 

1570 predicate_ordering_cache = {} 

1571 entity_position_cache = {} 

1572 

1573 for triple in triples: 

1574 predicate = str(triple[1]) 

1575 object_value = str(triple[2]) 

1576 object_shape_uri = object_shapes_cache.get(object_value) 

1577 

1578 if predicate not in predicate_ordering_cache: 

1579 predicate_ordering_cache[predicate] = get_predicate_ordering_info(predicate, highest_priority_class, entity_shape) 

1580 

1581 order_property = predicate_ordering_cache[predicate] 

1582 if order_property and validators.url(object_value) and relevant_snapshot: 

1583 position_key = (object_value, predicate) 

1584 if position_key not in entity_position_cache: 

1585 entity_position_cache[position_key] = get_entity_position_in_sequence( 

1586 object_value, entity_uri, predicate, order_property, relevant_snapshot 

1587 ) 

1588 

1589 group_key = (predicate, object_shape_uri) 

1590 if group_key not in predicate_shape_groups: 

1591 predicate_shape_groups[group_key] = [] 

1592 predicate_shape_groups[group_key].append(triple) 

1593 

1594 processed_predicates = set() 

1595 

1596 def get_cached_position(triple, predicate_uri): 

1597 object_value = str(triple[2]) 

1598 position_key = (object_value, predicate_uri) 

1599 return entity_position_cache.get(position_key, float('inf')) 

1600 

1601 for predicate in ordered_properties: 

1602 shape_order = get_shape_order_from_display_rules(highest_priority_class, entity_shape, predicate) 

1603 predicate_groups = [] 

1604 for group_key, group_triples in predicate_shape_groups.items(): 

1605 predicate_uri, object_shape_uri = group_key 

1606 if predicate_uri == predicate: 

1607 if object_shape_uri and object_shape_uri in shape_order: 

1608 shape_priority = shape_order.index(object_shape_uri) 

1609 else: 

1610 # Objects without shapes or shapes not in display rules go at the end 

1611 shape_priority = len(shape_order) 

1612 

1613 predicate_groups.append((shape_priority, group_key, group_triples)) 

1614 

1615 predicate_groups.sort(key=lambda x: x[0]) 

1616 for _, group_key, group_triples in predicate_groups: 

1617 processed_predicates.add(group_key) 

1618 

1619 predicate_uri, _ = group_key 

1620 order_property = predicate_ordering_cache.get(predicate_uri) 

1621 

1622 if order_property and relevant_snapshot: 

1623 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri)) 

1624 

1625 for triple in group_triples: 

1626 modification_text += format_triple_modification( 

1627 triple, 

1628 highest_priority_class, 

1629 entity_shape, 

1630 object_shapes_cache, 

1631 object_classes_cache, 

1632 relevant_snapshot, 

1633 custom_filter, 

1634 subject_uri=entity_uri, 

1635 predicate_ordering_cache=predicate_ordering_cache, 

1636 entity_position_cache=entity_position_cache, 

1637 ) 

1638 

1639 # Then handle any remaining predicate+shape groups not in the ordered list 

1640 for group_key, group_triples in predicate_shape_groups.items(): 

1641 if group_key not in processed_predicates: 

1642 # Sort remaining triples by their cached positions too 

1643 predicate_uri, _ = group_key 

1644 order_property = predicate_ordering_cache.get(predicate_uri) 

1645 

1646 if order_property and relevant_snapshot: 

1647 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri)) 

1648 

1649 for triple in group_triples: 

1650 modification_text += format_triple_modification( 

1651 triple, 

1652 highest_priority_class, 

1653 entity_shape, 

1654 object_shapes_cache, 

1655 object_classes_cache, 

1656 relevant_snapshot, 

1657 custom_filter, 

1658 subject_uri=entity_uri, 

1659 predicate_ordering_cache=predicate_ordering_cache, 

1660 entity_position_cache=entity_position_cache, 

1661 ) 

1662 

1663 modification_text += "</ul>" 

1664 

1665 return modification_text 

1666 

1667 

1668def format_triple_modification( 

1669 triple: Tuple[URIRef, URIRef, URIRef|Literal], 

1670 highest_priority_class: str, 

1671 entity_shape: str, 

1672 object_shapes_cache: dict, 

1673 object_classes_cache: dict, 

1674 relevant_snapshot: Optional[Graph], 

1675 custom_filter: Filter, 

1676 subject_uri: str = None, 

1677 predicate_ordering_cache: Optional[dict] = None, 

1678 entity_position_cache: Optional[dict] = None, 

1679) -> str: 

1680 """ 

1681 Format a single triple modification as HTML. 

1682 

1683 Args: 

1684 triple: The RDF triple being modified 

1685 highest_priority_class: The highest priority class for the subject entity 

1686 entity_shape: The shape for the subject entity 

1687 object_shapes_cache: Pre-computed cache of object shapes 

1688 object_classes_cache: Pre-computed cache of object classes 

1689 relevant_snapshot: Graph snapshot for context 

1690 custom_filter (Filter): Filter instance for formatting 

1691 subject_uri: URI of the subject entity (for ordering queries) 

1692 

1693 Returns: 

1694 str: HTML text describing the modification 

1695 """ 

1696 predicate = triple[1] 

1697 object_value = triple[2] 

1698 

1699 object_shape_uri = object_shapes_cache.get(str(object_value)) 

1700 

1701 predicate_label = custom_filter.human_readable_predicate( 

1702 predicate, (highest_priority_class, entity_shape), object_shape_uri=object_shape_uri 

1703 ) 

1704 

1705 object_class = object_classes_cache.get(str(object_value)) # Get from classes cache 

1706 object_label = get_object_label( 

1707 object_value, 

1708 predicate, 

1709 object_shape_uri, 

1710 object_class, 

1711 relevant_snapshot, 

1712 custom_filter, 

1713 subject_entity_key=(highest_priority_class, entity_shape), 

1714 ) 

1715 

1716 order_info = "" 

1717 if subject_uri and validators.url(str(object_value)): 

1718 if predicate_ordering_cache and entity_position_cache: 

1719 order_property = predicate_ordering_cache.get(str(predicate)) 

1720 if order_property: 

1721 position_key = (str(object_value), str(predicate)) 

1722 position = entity_position_cache.get(position_key) 

1723 if position is not None: 

1724 order_info = f' <span class="order-position-badge">#{position}</span>' 

1725 

1726 return f""" 

1727 <li class='d-flex align-items-center'> 

1728 <span class='flex-grow-1 d-flex flex-column justify-content-center ms-3 mb-2 w-100'> 

1729 <strong>{predicate_label}{order_info}</strong> 

1730 <span class="object-value word-wrap">{object_label}</span> 

1731 </span> 

1732 </li>""" 

1733 

1734 

1735def get_object_label( 

1736 object_value: str, 

1737 predicate: str, 

1738 object_shape_uri: Optional[str], 

1739 object_class: Optional[str], 

1740 snapshot: Optional[Graph], 

1741 custom_filter: Filter, 

1742 subject_entity_key: Optional[tuple] = None, 

1743) -> str: 

1744 """ 

1745 Get appropriate display label for an object value. 

1746 

1747 Args: 

1748 object_value: The value to get a label for 

1749 predicate: The predicate URI 

1750 object_shape_uri: Pre-computed shape URI for the object 

1751 object_class: Pre-computed class for the object 

1752 snapshot: Graph snapshot for context (essential for deleted triples) 

1753 custom_filter (Filter): Custom filter instance for formatting 

1754 subject_entity_key: Tuple of (class, shape) for the subject entity 

1755 

1756 Returns: 

1757 str: A human-readable label for the object value 

1758 """ 

1759 predicate = str(predicate) 

1760 

1761 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": 

1762 return custom_filter.human_readable_class(subject_entity_key) 

1763 

1764 if validators.url(object_value): 

1765 if object_shape_uri or object_class: 

1766 return custom_filter.human_readable_entity( 

1767 object_value, (object_class, object_shape_uri), snapshot 

1768 ) 

1769 else: 

1770 return str(object_value) 

1771 

1772 return str(object_value) 

1773 

1774 

1775def process_modification_data(data: dict) -> Tuple[str, List[dict]]: 

1776 """ 

1777 Process modification data to extract subjects and predicates. 

1778 

1779 Args: 

1780 data: Dictionary containing modification data 

1781 

1782 Returns: 

1783 Tuple containing subject URI and list of modification details 

1784 """ 

1785 subject_uri = data.get("subject") 

1786 if not subject_uri: 

1787 raise ValueError("No subject URI provided in modification data") 

1788 

1789 modifications = data.get("modifications", []) 

1790 if not modifications: 

1791 raise ValueError("No modifications provided in data") 

1792 

1793 return subject_uri, modifications 

1794 

1795 

1796def validate_modification( 

1797 modification: dict, subject_uri: str 

1798) -> Tuple[bool, str]: 

1799 """ 

1800 Validate a single modification operation. 

1801 

1802 Args: 

1803 modification: Dictionary containing modification details 

1804 subject_uri: URI of the subject being modified 

1805 

1806 Returns: 

1807 Tuple of (is_valid, error_message) 

1808 """ 

1809 form_fields = get_form_fields() 

1810 operation = modification.get("operation") 

1811 if not operation: 

1812 return False, "No operation specified in modification" 

1813 

1814 predicate = modification.get("predicate") 

1815 if not predicate: 

1816 return False, "No predicate specified in modification" 

1817 

1818 if operation not in ["add", "remove", "update"]: 

1819 return False, f"Invalid operation: {operation}" 

1820 

1821 if form_fields: 

1822 entity_type = modification.get("entity_type") 

1823 entity_shape = modification.get("entity_shape") 

1824 

1825 # If entity_type is not provided in modification, get it from the database 

1826 if not entity_type: 

1827 entity_types = get_entity_types(subject_uri) 

1828 if entity_types: 

1829 entity_type = get_highest_priority_class(entity_types) 

1830 

1831 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields) 

1832 

1833 if matching_key: 

1834 predicate_fields = form_fields[matching_key].get(predicate, []) 

1835 

1836 for field in predicate_fields: 

1837 if operation == "remove" and field.get("minCount", 0) > 0: 

1838 return False, f"Cannot remove required predicate: {predicate}" 

1839 

1840 if operation == "add": 

1841 current_count = get_predicate_count(subject_uri, predicate) 

1842 max_count = field.get("maxCount") 

1843 

1844 if max_count and current_count >= max_count: 

1845 return ( 

1846 False, 

1847 f"Maximum count exceeded for predicate: {predicate}", 

1848 ) 

1849 

1850 return True, "" 

1851 

1852 

1853def get_predicate_count(subject_uri: str, predicate: str) -> int: 

1854 """ 

1855 Get the current count of values for a predicate. 

1856 

1857 Args: 

1858 subject_uri: URI of the entity 

1859 predicate: Predicate URI to count 

1860 

1861 Returns: 

1862 Number of values for the predicate 

1863 """ 

1864 sparql = get_sparql() 

1865 

1866 query = f""" 

1867 SELECT (COUNT(?o) as ?count) WHERE {{ 

1868 <{subject_uri}> <{predicate}> ?o . 

1869 }} 

1870 """ 

1871 

1872 sparql.setQuery(query) 

1873 sparql.setReturnFormat(JSON) 

1874 results = sparql.query().convert() 

1875 

1876 return int(results["results"]["bindings"][0]["count"]["value"]) 

1877 

1878 

1879def apply_modifications( 

1880 editor: Editor, 

1881 modifications: List[dict], 

1882 subject_uri: str, 

1883 graph_uri: Optional[str] = None, 

1884): 

1885 """ 

1886 Apply a list of modifications to an entity. 

1887 

1888 Args: 

1889 editor: Editor instance to use for modifications 

1890 modifications: List of modification operations 

1891 subject_uri: URI of the entity being modified 

1892 graph_uri: Optional graph URI for quad store 

1893 """ 

1894 for mod in modifications: 

1895 operation = mod["operation"] 

1896 predicate = mod["predicate"] 

1897 

1898 if operation == "remove": 

1899 editor.delete(URIRef(subject_uri), URIRef(predicate), graph_uri=graph_uri) 

1900 

1901 elif operation == "add": 

1902 value = mod["value"] 

1903 datatype = mod.get("datatype", XSD.string) 

1904 

1905 if validators.url(value): 

1906 object_value = URIRef(value) 

1907 else: 

1908 object_value = Literal(value, datatype=URIRef(datatype)) 

1909 

1910 editor.create( 

1911 URIRef(subject_uri), URIRef(predicate), object_value, graph_uri 

1912 ) 

1913 

1914 elif operation == "update": 

1915 old_value = mod["oldValue"] 

1916 new_value = mod["newValue"] 

1917 datatype = mod.get("datatype", XSD.string) 

1918 

1919 if validators.url(old_value): 

1920 old_object = URIRef(old_value) 

1921 else: 

1922 old_object = Literal(old_value, datatype=URIRef(datatype)) 

1923 

1924 if validators.url(new_value): 

1925 new_object = URIRef(new_value) 

1926 else: 

1927 new_object = Literal(new_value, datatype=URIRef(datatype)) 

1928 

1929 editor.update( 

1930 URIRef(subject_uri), 

1931 URIRef(predicate), 

1932 old_object, 

1933 new_object, 

1934 graph_uri, 

1935 )