Coverage for heritrace/routes/entity.py: 89%

684 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-18 11:10 +0000

1import json 

2from datetime import datetime 

3from typing import Dict, List, Optional, Tuple 

4import re 

5 

6import validators 

7from flask import (Blueprint, abort, current_app, flash, jsonify, redirect, 

8 render_template, request, url_for) 

9from flask_babel import gettext 

10from flask_login import current_user, login_required 

11from heritrace.editor import Editor 

12from heritrace.extensions import (get_change_tracking_config, 

13 get_custom_filter, get_dataset_endpoint, 

14 get_dataset_is_quadstore, get_display_rules, 

15 get_form_fields, get_provenance_endpoint, 

16 get_provenance_sparql, get_shacl_graph, 

17 get_sparql) 

18from heritrace.forms import * 

19from heritrace.utils.converters import convert_to_datetime 

20from heritrace.utils.display_rules_utils import (get_class_priority, 

21 get_grouped_triples, 

22 get_highest_priority_class, 

23 get_property_order_from_rules, 

24 is_entity_type_visible) 

25from heritrace.utils.filters import Filter 

26from heritrace.utils.shacl_utils import get_valid_predicates 

27from heritrace.utils.sparql_utils import ( 

28 fetch_current_state_with_related_entities, fetch_data_graph_for_subject, 

29 parse_sparql_update, get_entity_types 

30) 

31from heritrace.utils.uri_utils import generate_unique_uri 

32from heritrace.utils.virtuoso_utils import (VIRTUOSO_EXCLUDED_GRAPHS, 

33 is_virtuoso) 

34from rdflib import RDF, XSD, ConjunctiveGraph, Graph, Literal, URIRef 

35from resources.datatypes import DATATYPE_MAPPING 

36from SPARQLWrapper import JSON 

37from time_agnostic_library.agnostic_entity import AgnosticEntity 

38 

39entity_bp = Blueprint("entity", __name__) 

40 

41 

42@entity_bp.route("/about/<path:subject>") 

43@login_required 

44def about(subject): 

45 """ 

46 Display detailed information about an entity. 

47 

48 Args: 

49 subject: URI of the entity to display 

50 """ 

51 # Get necessary services and configurations 

52 change_tracking_config = get_change_tracking_config() 

53 

54 # Initialize agnostic entity and get its history 

55 agnostic_entity = AgnosticEntity( 

56 res=subject, config=change_tracking_config, related_entities_history=True 

57 ) 

58 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

59 

60 is_deleted = False 

61 context_snapshot = None 

62 subject_classes = [] 

63 

64 # Process entity history 

65 if history.get(subject): 

66 sorted_timestamps = sorted(history[subject].keys()) 

67 latest_snapshot = history[subject][sorted_timestamps[-1]] 

68 latest_metadata = next( 

69 ( 

70 meta 

71 for _, meta in provenance[subject].items() 

72 if meta["generatedAtTime"] == sorted_timestamps[-1] 

73 ), 

74 None, 

75 ) 

76 

77 is_deleted = ( 

78 latest_metadata 

79 and "invalidatedAtTime" in latest_metadata 

80 and latest_metadata["invalidatedAtTime"] 

81 ) 

82 

83 if is_deleted and len(sorted_timestamps) > 1: 

84 context_snapshot = history[subject][sorted_timestamps[-2]] 

85 subject_classes = [ 

86 o 

87 for _, _, o in context_snapshot.triples( 

88 (URIRef(subject), RDF.type, None) 

89 ) 

90 ] 

91 else: 

92 context_snapshot = None 

93 

94 grouped_triples = {} 

95 can_be_added = [] 

96 can_be_deleted = [] 

97 datatypes = {} 

98 mandatory_values = {} 

99 optional_values = {} 

100 valid_predicates = [] 

101 entity_type = None 

102 data_graph = None 

103 linked_resources = [] 

104 inverse_references = [] 

105 

106 if not is_deleted: 

107 # Fetch current entity state 

108 data_graph = fetch_data_graph_for_subject(subject) 

109 if data_graph: 

110 triples = list(data_graph.triples((None, None, None))) 

111 # Get valid predicates and other metadata 

112 ( 

113 can_be_added, 

114 can_be_deleted, 

115 datatypes, 

116 mandatory_values, 

117 optional_values, 

118 subject_classes, 

119 valid_predicates, 

120 ) = get_valid_predicates(triples) 

121 

122 # Group triples for display 

123 grouped_triples, relevant_properties = get_grouped_triples( 

124 subject, triples, subject_classes, valid_predicates 

125 ) 

126 

127 can_be_added = [uri for uri in can_be_added if uri in relevant_properties] 

128 can_be_deleted = [ 

129 uri for uri in can_be_deleted if uri in relevant_properties 

130 ] 

131 

132 # Get resources that this entity links to (outgoing links) 

133 linked_resources = set() 

134 for _, predicate, obj in data_graph.triples((URIRef(subject), None, None)): 

135 if isinstance(obj, URIRef) and str(obj) != str(subject) and predicate != RDF.type: 

136 linked_resources.add(str(obj)) 

137 

138 # Get inverse references only for non-deleted entities 

139 inverse_references = get_inverse_references(subject) 

140 

141 # Add inverse references to linked resources 

142 for ref in inverse_references: 

143 linked_resources.add(ref["subject"]) 

144 

145 # Convert to list 

146 linked_resources = list(linked_resources) 

147 

148 else: 

149 # For deleted entities, we don't need to get any linked resources 

150 linked_resources = [] 

151 

152 update_form = UpdateTripleForm() 

153 create_form = ( 

154 CreateTripleFormWithSelect() if can_be_added else CreateTripleFormWithInput() 

155 ) 

156 if can_be_added: 

157 create_form.predicate.choices = [ 

158 (p, get_custom_filter().human_readable_predicate(p, subject_classes)) 

159 for p in can_be_added 

160 ] 

161 

162 form_fields = get_form_fields() 

163 entity_types = list(form_fields.keys()) 

164 

165 predicate_details_map = {} 

166 for entity_type_key, predicates in form_fields.items(): 

167 for predicate_uri, details_list in predicates.items(): 

168 for details in details_list: 

169 shape = details.get("nodeShape") 

170 key = (predicate_uri, entity_type_key, shape) 

171 predicate_details_map[key] = details 

172 

173 # Ensure entity_type is set correctly using the potentially updated subject_classes 

174 entity_type = str(get_highest_priority_class(subject_classes)) if subject_classes else None 

175 

176 return render_template( 

177 "entity/about.jinja", 

178 subject=subject, 

179 history=history, 

180 can_be_added=can_be_added, 

181 can_be_deleted=can_be_deleted, 

182 datatypes=datatypes, 

183 update_form=update_form, 

184 create_form=create_form, 

185 mandatory_values=mandatory_values, 

186 optional_values=optional_values, 

187 shacl=bool(len(get_shacl_graph())), 

188 grouped_triples=grouped_triples, 

189 subject_classes=[str(s_class) for s_class in subject_classes], 

190 display_rules=get_display_rules(), 

191 form_fields=form_fields, 

192 entity_types=entity_types, 

193 entity_type=entity_type, 

194 predicate_details_map=predicate_details_map, 

195 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"], 

196 dataset_db_text_index_enabled=current_app.config[ 

197 "DATASET_DB_TEXT_INDEX_ENABLED" 

198 ], 

199 inverse_references=inverse_references, 

200 is_deleted=is_deleted, 

201 context=context_snapshot, 

202 linked_resources=linked_resources, 

203 ) 

204 

205 

206@entity_bp.route("/create-entity", methods=["GET", "POST"]) 

207@login_required 

208def create_entity(): 

209 form_fields = get_form_fields() 

210 

211 entity_types = sorted( 

212 [ 

213 entity_type 

214 for entity_type in form_fields.keys() 

215 if is_entity_type_visible(entity_type) 

216 ], 

217 key=lambda et: get_class_priority(et), 

218 reverse=True, 

219 ) 

220 

221 datatype_options = { 

222 gettext("Text (string)"): XSD.string, 

223 gettext("Whole number (integer)"): XSD.integer, 

224 gettext("True or False (boolean)"): XSD.boolean, 

225 gettext("Date (YYYY-MM-DD)"): XSD.date, 

226 gettext("Date and Time (YYYY-MM-DDThh:mm:ss)"): XSD.dateTime, 

227 gettext("Decimal number"): XSD.decimal, 

228 gettext("Floating point number"): XSD.float, 

229 gettext("Double precision floating point number"): XSD.double, 

230 gettext("Time (hh:mm:ss)"): XSD.time, 

231 gettext("Year (YYYY)"): XSD.gYear, 

232 gettext("Month (MM)"): XSD.gMonth, 

233 gettext("Day of the month (DD)"): XSD.gDay, 

234 gettext("Duration (e.g., P1Y2M3DT4H5M6S)"): XSD.duration, 

235 gettext("Hexadecimal binary"): XSD.hexBinary, 

236 gettext("Base64 encoded binary"): XSD.base64Binary, 

237 gettext("Web address (URL)"): XSD.anyURI, 

238 gettext("Language code (e.g., en, it)"): XSD.language, 

239 gettext("Normalized text (no line breaks)"): XSD.normalizedString, 

240 gettext("Tokenized text (single word)"): XSD.token, 

241 gettext("Non-positive integer (0 or negative)"): XSD.nonPositiveInteger, 

242 gettext("Negative integer"): XSD.negativeInteger, 

243 gettext("Long integer"): XSD.long, 

244 gettext("Short integer"): XSD.short, 

245 gettext("Byte-sized integer"): XSD.byte, 

246 gettext("Non-negative integer (0 or positive)"): XSD.nonNegativeInteger, 

247 gettext("Positive integer (greater than 0)"): XSD.positiveInteger, 

248 gettext("Unsigned long integer"): XSD.unsignedLong, 

249 gettext("Unsigned integer"): XSD.unsignedInt, 

250 gettext("Unsigned short integer"): XSD.unsignedShort, 

251 gettext("Unsigned byte"): XSD.unsignedByte, 

252 } 

253 

254 if request.method == "POST": 

255 structured_data = json.loads(request.form.get("structured_data", "{}")) 

256 

257 editor = Editor( 

258 get_dataset_endpoint(), 

259 get_provenance_endpoint(), 

260 current_app.config["COUNTER_HANDLER"], 

261 URIRef(f"https://orcid.org/{current_user.orcid}"), 

262 current_app.config["PRIMARY_SOURCE"], 

263 current_app.config["DATASET_GENERATION_TIME"], 

264 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

265 ) 

266 

267 if form_fields: 

268 validation_errors = validate_entity_data(structured_data, form_fields) 

269 if validation_errors: 

270 return jsonify({"status": "error", "errors": validation_errors}), 400 

271 

272 entity_type = structured_data.get("entity_type") 

273 properties = structured_data.get("properties", {}) 

274 

275 entity_uri = generate_unique_uri(entity_type) 

276 editor.preexisting_finished() 

277 

278 default_graph_uri = ( 

279 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None 

280 ) 

281 

282 for predicate, values in properties.items(): 

283 if not isinstance(values, list): 

284 values = [values] 

285 

286 field_definitions = form_fields.get(entity_type, {}).get(predicate, []) 

287 

288 # Get the shape from the property value if available 

289 property_shape = None 

290 if values and isinstance(values[0], dict): 

291 property_shape = values[0].get("shape") 

292 

293 # Filter field definitions to find the matching one based on shape 

294 matching_field_def = None 

295 for field_def in field_definitions: 

296 if property_shape: 

297 # If property has a shape, match it with the field definition's subjectShape 

298 if field_def.get("subjectShape") == property_shape: 

299 matching_field_def = field_def 

300 break 

301 else: 

302 # If no shape specified, use the first field definition without a shape requirement 

303 if not field_def.get("subjectShape"): 

304 matching_field_def = field_def 

305 break 

306 

307 # If no matching field definition found, use the first one (default behavior) 

308 if not matching_field_def and field_definitions: 

309 matching_field_def = field_definitions[0] 

310 

311 ordered_by = ( 

312 matching_field_def.get("orderedBy") if matching_field_def else None 

313 ) 

314 

315 if ordered_by: 

316 # Gestisci le proprietà ordinate per shape 

317 values_by_shape = {} 

318 for value in values: 

319 # Ottieni la shape dell'entità 

320 shape = value.get("shape") 

321 if not shape: 

322 shape = "default_shape" 

323 if shape not in values_by_shape: 

324 values_by_shape[shape] = [] 

325 values_by_shape[shape].append(value) 

326 

327 # Ora processa ogni gruppo di valori per shape separatamente 

328 for shape, shape_values in values_by_shape.items(): 

329 previous_entity = None 

330 for value in shape_values: 

331 if isinstance(value, dict) and "entity_type" in value: 

332 nested_uri = generate_unique_uri(value["entity_type"]) 

333 editor.create( 

334 entity_uri, 

335 URIRef(predicate), 

336 nested_uri, 

337 default_graph_uri, 

338 ) 

339 create_nested_entity( 

340 editor, 

341 nested_uri, 

342 value, 

343 default_graph_uri, 

344 form_fields, 

345 ) 

346 else: 

347 # If it's a direct URI value (reference to existing entity) 

348 nested_uri = URIRef(value) 

349 editor.create( 

350 entity_uri, 

351 URIRef(predicate), 

352 nested_uri, 

353 default_graph_uri, 

354 ) 

355 

356 if previous_entity: 

357 editor.create( 

358 previous_entity, 

359 URIRef(ordered_by), 

360 nested_uri, 

361 default_graph_uri, 

362 ) 

363 previous_entity = nested_uri 

364 else: 

365 # Gestisci le proprietà non ordinate 

366 for value in values: 

367 if isinstance(value, dict) and "entity_type" in value: 

368 nested_uri = generate_unique_uri(value["entity_type"]) 

369 editor.create( 

370 entity_uri, 

371 URIRef(predicate), 

372 nested_uri, 

373 default_graph_uri, 

374 ) 

375 create_nested_entity( 

376 editor, 

377 nested_uri, 

378 value, 

379 default_graph_uri, 

380 form_fields, 

381 ) 

382 else: 

383 # Handle both URI references and literal values 

384 if validators.url(str(value)): 

385 object_value = URIRef(value) 

386 else: 

387 datatype_uris = [] 

388 if matching_field_def: 

389 datatype_uris = matching_field_def.get( 

390 "datatypes", [] 

391 ) 

392 datatype = determine_datatype(value, datatype_uris) 

393 object_value = Literal(value, datatype=datatype) 

394 editor.create( 

395 entity_uri, 

396 URIRef(predicate), 

397 object_value, 

398 default_graph_uri, 

399 ) 

400 else: 

401 properties = structured_data.get("properties", {}) 

402 

403 entity_uri = generate_unique_uri() 

404 editor.import_entity(entity_uri) 

405 editor.preexisting_finished() 

406 

407 default_graph_uri = ( 

408 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None 

409 ) 

410 

411 for predicate, values in properties.items(): 

412 if not isinstance(values, list): 

413 values = [values] 

414 for value_dict in values: 

415 if value_dict["type"] == "uri": 

416 editor.create( 

417 entity_uri, 

418 URIRef(predicate), 

419 URIRef(value_dict["value"]), 

420 default_graph_uri, 

421 ) 

422 elif value_dict["type"] == "literal": 

423 datatype = ( 

424 URIRef(value_dict["datatype"]) 

425 if "datatype" in value_dict 

426 else XSD.string 

427 ) 

428 editor.create( 

429 entity_uri, 

430 URIRef(predicate), 

431 Literal(value_dict["value"], datatype=datatype), 

432 default_graph_uri, 

433 ) 

434 

435 try: 

436 editor.save() 

437 response = jsonify( 

438 { 

439 "status": "success", 

440 "redirect_url": url_for("entity.about", subject=str(entity_uri)), 

441 } 

442 ) 

443 flash(gettext("Entity created successfully"), "success") 

444 return response, 200 

445 except Exception as e: 

446 error_message = gettext( 

447 "An error occurred while creating the entity: %(error)s", error=str(e) 

448 ) 

449 return jsonify({"status": "error", "errors": [error_message]}), 500 

450 

451 return render_template( 

452 "create_entity.jinja", 

453 shacl=bool(get_form_fields()), 

454 entity_types=entity_types, 

455 form_fields=form_fields, 

456 datatype_options=datatype_options, 

457 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"], 

458 dataset_db_text_index_enabled=current_app.config[ 

459 "DATASET_DB_TEXT_INDEX_ENABLED" 

460 ], 

461 ) 

462 

463 

464def create_nested_entity( 

465 editor: Editor, entity_uri, entity_data, graph_uri=None, form_fields=None 

466): 

467 # Add rdf:type 

468 editor.create( 

469 entity_uri, 

470 URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), 

471 URIRef(entity_data["entity_type"]), 

472 graph_uri, 

473 ) 

474 

475 entity_type = entity_data.get("entity_type") 

476 properties = entity_data.get("properties", {}) 

477 

478 # Add other properties 

479 for predicate, values in properties.items(): 

480 if not isinstance(values, list): 

481 values = [values] 

482 field_definitions = form_fields.get(entity_type, {}).get(predicate, []) 

483 for value in values: 

484 if isinstance(value, dict) and "entity_type" in value: 

485 if "intermediateRelation" in value: 

486 intermediate_uri = generate_unique_uri( 

487 value["intermediateRelation"]["class"] 

488 ) 

489 target_uri = generate_unique_uri(value["entity_type"]) 

490 editor.create( 

491 entity_uri, URIRef(predicate), intermediate_uri, graph_uri 

492 ) 

493 editor.create( 

494 intermediate_uri, 

495 URIRef(value["intermediateRelation"]["property"]), 

496 target_uri, 

497 graph_uri, 

498 ) 

499 create_nested_entity( 

500 editor, target_uri, value, graph_uri, form_fields 

501 ) 

502 else: 

503 # Handle nested entities 

504 nested_uri = generate_unique_uri(value["entity_type"]) 

505 editor.create(entity_uri, URIRef(predicate), nested_uri, graph_uri) 

506 create_nested_entity( 

507 editor, nested_uri, value, graph_uri, form_fields 

508 ) 

509 else: 

510 # Handle simple properties 

511 datatype = XSD.string # Default to string if not specified 

512 datatype_uris = [] 

513 if field_definitions: 

514 datatype_uris = field_definitions[0].get("datatypes", []) 

515 datatype = determine_datatype(value, datatype_uris) 

516 object_value = ( 

517 URIRef(value) 

518 if validators.url(value) 

519 else Literal(value, datatype=datatype) 

520 ) 

521 editor.create(entity_uri, URIRef(predicate), object_value, graph_uri) 

522 

523 

524def determine_datatype(value, datatype_uris): 

525 for datatype_uri in datatype_uris: 

526 validation_func = next( 

527 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None 

528 ) 

529 if validation_func and validation_func(value): 

530 return URIRef(datatype_uri) 

531 # If none match, default to XSD.string 

532 return XSD.string 

533 

534 

535def validate_entity_data(structured_data, form_fields): 

536 """ 

537 Validates entity data against form field definitions, considering shape matching. 

538 

539 Args: 

540 structured_data (dict): Data to validate containing entity_type and properties 

541 form_fields (dict): Form field definitions from SHACL shapes 

542 

543 Returns: 

544 list: List of validation error messages, empty if validation passes 

545 """ 

546 custom_filter = get_custom_filter() 

547 

548 errors = [] 

549 entity_type = structured_data.get("entity_type") 

550 if not entity_type: 

551 errors.append(gettext("Entity type is required")) 

552 elif entity_type not in form_fields: 

553 errors.append( 

554 gettext( 

555 "Invalid entity type selected: %(entity_type)s", 

556 entity_type=entity_type, 

557 ) 

558 ) 

559 

560 if errors: 

561 return errors 

562 

563 entity_fields = form_fields.get(entity_type, {}) 

564 properties = structured_data.get("properties", {}) 

565 

566 for prop_uri, prop_values in properties.items(): 

567 if URIRef(prop_uri) == RDF.type: 

568 continue 

569 

570 field_definitions = entity_fields.get(prop_uri) 

571 if not field_definitions: 

572 errors.append( 

573 gettext( 

574 "Unknown property %(prop_uri)s for entity type %(entity_type)s", 

575 prop_uri=prop_uri, 

576 entity_type=entity_type, 

577 ) 

578 ) 

579 continue 

580 

581 if not isinstance(prop_values, list): 

582 prop_values = [prop_values] 

583 

584 # Get the shape from the property value if available 

585 property_shape = None 

586 if prop_values and isinstance(prop_values[0], dict): 

587 property_shape = prop_values[0].get("shape") 

588 

589 # Filter field definitions to find the matching one based on shape 

590 matching_field_def = None 

591 for field_def in field_definitions: 

592 if property_shape: 

593 # If property has a shape, match it with the field definition's subjectShape 

594 if field_def.get("subjectShape") == property_shape: 

595 matching_field_def = field_def 

596 break 

597 else: 

598 # If no shape specified, use the first field definition without a shape requirement 

599 if not field_def.get("subjectShape"): 

600 matching_field_def = field_def 

601 break 

602 

603 # If no matching field definition found, use the first one (default behavior) 

604 if not matching_field_def and field_definitions: 

605 matching_field_def = field_definitions[0] 

606 

607 if matching_field_def: 

608 # Validate cardinality 

609 min_count = matching_field_def.get("min", 0) 

610 max_count = matching_field_def.get("max", None) 

611 value_count = len(prop_values) 

612 

613 if value_count < min_count: 

614 value = gettext("values") if min_count > 1 else gettext("value") 

615 errors.append( 

616 gettext( 

617 "Property %(prop_uri)s requires at least %(min_count)d %(value)s", 

618 prop_uri=custom_filter.human_readable_predicate( 

619 prop_uri, [entity_type] 

620 ), 

621 min_count=min_count, 

622 value=value, 

623 ) 

624 ) 

625 if max_count is not None and value_count > max_count: 

626 value = gettext("values") if max_count > 1 else gettext("value") 

627 errors.append( 

628 gettext( 

629 "Property %(prop_uri)s allows at most %(max_count)d %(value)s", 

630 prop_uri=custom_filter.human_readable_predicate( 

631 prop_uri, [entity_type] 

632 ), 

633 max_count=max_count, 

634 value=value, 

635 ) 

636 ) 

637 

638 # Validate mandatory values 

639 mandatory_values = matching_field_def.get("mandatory_values", []) 

640 for mandatory_value in mandatory_values: 

641 if mandatory_value not in prop_values: 

642 errors.append( 

643 gettext( 

644 "Property %(prop_uri)s requires the value %(mandatory_value)s", 

645 prop_uri=custom_filter.human_readable_predicate( 

646 prop_uri, [entity_type] 

647 ), 

648 mandatory_value=mandatory_value, 

649 ) 

650 ) 

651 

652 # Validate each value 

653 for value in prop_values: 

654 if isinstance(value, dict) and "entity_type" in value: 

655 nested_errors = validate_entity_data(value, form_fields) 

656 errors.extend(nested_errors) 

657 else: 

658 # Validate against datatypes 

659 datatypes = matching_field_def.get("datatypes", []) 

660 if datatypes: 

661 is_valid_datatype = False 

662 for dtype in datatypes: 

663 validation_func = next( 

664 ( 

665 d[1] 

666 for d in DATATYPE_MAPPING 

667 if d[0] == URIRef(dtype) 

668 ), 

669 None, 

670 ) 

671 if validation_func and validation_func(value): 

672 is_valid_datatype = True 

673 break 

674 if not is_valid_datatype: 

675 expected_types = ", ".join( 

676 [ 

677 custom_filter.human_readable_predicate( 

678 dtype, form_fields.keys() 

679 ) 

680 for dtype in datatypes 

681 ] 

682 ) 

683 errors.append( 

684 gettext( 

685 'Value "%(value)s" for property %(prop_uri)s is not of expected type %(expected_types)s', 

686 value=value, 

687 prop_uri=custom_filter.human_readable_predicate( 

688 prop_uri, form_fields.keys() 

689 ), 

690 expected_types=expected_types, 

691 ) 

692 ) 

693 

694 # Validate against optional values 

695 optional_values = matching_field_def.get("optionalValues", []) 

696 if optional_values and value not in optional_values: 

697 acceptable_values = ", ".join( 

698 [ 

699 custom_filter.human_readable_predicate( 

700 val, form_fields.keys() 

701 ) 

702 for val in optional_values 

703 ] 

704 ) 

705 errors.append( 

706 gettext( 

707 'Value "%(value)s" is not permitted for property %(prop_uri)s. Acceptable values are: %(acceptable_values)s', 

708 value=value, 

709 prop_uri=custom_filter.human_readable_predicate( 

710 prop_uri, form_fields.keys() 

711 ), 

712 acceptable_values=acceptable_values, 

713 ) 

714 ) 

715 

716 # In the RDF model, a property with zero values is equivalent to the property being absent, 

717 # as a triple requires a subject, predicate, and object. Therefore, this section checks for 

718 # properties defined in the schema that are completely absent from the input data but are 

719 # required (min_count > 0). This complements the cardinality check above, which only 

720 # validates properties that are present in the data. 

721 # Check for missing required properties 

722 for prop_uri, field_definitions in entity_fields.items(): 

723 if prop_uri not in properties: 

724 for field_def in field_definitions: 

725 min_count = field_def.get("min", 0) 

726 if min_count > 0: 

727 value = gettext("values") if min_count > 1 else gettext("value") 

728 errors.append( 

729 gettext( 

730 "Missing required property: %(prop_uri)s requires at least %(min_count)d %(value)s", 

731 prop_uri=custom_filter.human_readable_predicate( 

732 prop_uri, [entity_type] 

733 ), 

734 min_count=min_count, 

735 value=value, 

736 ) 

737 ) 

738 break # Only need to report once per property 

739 

740 return errors 

741 

742 

743@entity_bp.route("/entity-history/<path:entity_uri>") 

744@login_required 

745def entity_history(entity_uri): 

746 """ 

747 Display the history of changes for an entity. 

748 

749 Args: 

750 entity_uri: URI of the entity 

751 """ 

752 custom_filter = get_custom_filter() 

753 change_tracking_config = get_change_tracking_config() 

754 

755 agnostic_entity = AgnosticEntity( 

756 res=entity_uri, config=change_tracking_config, related_entities_history=True 

757 ) 

758 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

759 

760 sorted_metadata = sorted( 

761 provenance[entity_uri].items(), 

762 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]), 

763 ) 

764 sorted_timestamps = [ 

765 convert_to_datetime(meta["generatedAtTime"], stringify=True) 

766 for _, meta in sorted_metadata 

767 ] 

768 

769 # Get correct context for entity label 

770 latest_metadata = sorted_metadata[-1][1] if sorted_metadata else None 

771 is_latest_deletion = ( 

772 latest_metadata 

773 and "invalidatedAtTime" in latest_metadata 

774 and latest_metadata["invalidatedAtTime"] 

775 ) 

776 if is_latest_deletion and len(sorted_timestamps) > 1: 

777 context_snapshot = history[entity_uri][sorted_timestamps[-2]] 

778 else: 

779 context_snapshot = history[entity_uri][sorted_timestamps[-1]] 

780 

781 entity_classes = set() 

782 classes = list(context_snapshot.triples((URIRef(entity_uri), RDF.type, None))) 

783 for triple in classes: 

784 entity_classes.add(str(triple[2])) 

785 highest_priority_class = get_highest_priority_class(entity_classes) 

786 entity_classes_for_label = [highest_priority_class] if highest_priority_class else [] 

787 

788 # Generate timeline events 

789 events = [] 

790 for i, (snapshot_uri, metadata) in enumerate(sorted_metadata): 

791 date = convert_to_datetime(metadata["generatedAtTime"]) 

792 snapshot_timestamp_str = convert_to_datetime( 

793 metadata["generatedAtTime"], stringify=True 

794 ) 

795 snapshot_graph = history[entity_uri][snapshot_timestamp_str] 

796 

797 responsible_agent = custom_filter.format_agent_reference( 

798 metadata["wasAttributedTo"] 

799 ) 

800 primary_source = custom_filter.format_source_reference( 

801 metadata["hadPrimarySource"] 

802 ) 

803 

804 description = _format_snapshot_description( 

805 metadata, 

806 entity_uri, 

807 entity_classes_for_label, 

808 context_snapshot, 

809 history, 

810 sorted_timestamps, 

811 i, 

812 custom_filter, 

813 ) 

814 modifications = metadata.get("hasUpdateQuery", "") 

815 modification_text = "" 

816 if modifications: 

817 parsed_modifications = parse_sparql_update(modifications) 

818 modification_text = generate_modification_text( 

819 parsed_modifications, 

820 list(entity_classes), 

821 history=history, 

822 entity_uri=entity_uri, 

823 current_snapshot=snapshot_graph, 

824 current_snapshot_timestamp=snapshot_timestamp_str, 

825 custom_filter=custom_filter, 

826 form_fields=get_form_fields(), 

827 ) 

828 

829 event = { 

830 "start_date": { 

831 "year": date.year, 

832 "month": date.month, 

833 "day": date.day, 

834 "hour": date.hour, 

835 "minute": date.minute, 

836 "second": date.second, 

837 }, 

838 "text": { 

839 "headline": gettext("Snapshot") + " " + str(i + 1), 

840 "text": f""" 

841 <p><strong>{gettext('Responsible agent')}:</strong> {responsible_agent}</p> 

842 <p><strong>{gettext('Primary source')}:</strong> {primary_source}</p> 

843 <p><strong>{gettext('Description')}:</strong> {description}</p> 

844 <div class="modifications mb-3"> 

845 {modification_text} 

846 </div> 

847 <a href='/entity-version/{entity_uri}/{metadata["generatedAtTime"]}' class='btn btn-outline-primary mt-2 view-version' target='_self'>{gettext('View version')}</a> 

848 """, 

849 }, 

850 "autolink": False, 

851 } 

852 

853 if i + 1 < len(sorted_metadata): 

854 next_date = convert_to_datetime( 

855 sorted_metadata[i + 1][1]["generatedAtTime"] 

856 ) 

857 event["end_date"] = { 

858 "year": next_date.year, 

859 "month": next_date.month, 

860 "day": next_date.day, 

861 "hour": next_date.hour, 

862 "minute": next_date.minute, 

863 "second": next_date.second, 

864 } 

865 

866 events.append(event) 

867 

868 entity_label = custom_filter.human_readable_entity( 

869 entity_uri, entity_classes, context_snapshot 

870 ) 

871 

872 timeline_data = { 

873 "entityUri": entity_uri, 

874 "entityLabel": entity_label, 

875 "entityClasses": list(entity_classes), 

876 "events": events, 

877 } 

878 

879 return render_template("entity/history.jinja", timeline_data=timeline_data) 

880 

881 

882def _format_snapshot_description( 

883 metadata: dict, 

884 entity_uri: str, 

885 entity_classes: list[str], 

886 context_snapshot: Graph, 

887 history: dict, 

888 sorted_timestamps: list[str], 

889 current_index: int, 

890 custom_filter: Filter, 

891) -> Tuple[str, bool]: 

892 """ 

893 Formats the snapshot description and determines if it's a merge snapshot. 

894 

895 Args: 

896 metadata: The snapshot metadata dictionary. 

897 entity_uri: The URI of the main entity. 

898 entity_classes: The classes of the main entity. 

899 context_snapshot: The graph snapshot for context. 

900 history: The history dictionary containing snapshots. 

901 sorted_timestamps: Sorted list of snapshot timestamps. 

902 current_index: The index of the current snapshot in sorted_timestamps. 

903 custom_filter: The custom filter instance for formatting. 

904 

905 Returns: 

906 The formatted description string. 

907 """ 

908 description = metadata.get("description", "") 

909 is_merge_snapshot = False 

910 was_derived_from = metadata.get('wasDerivedFrom') 

911 if isinstance(was_derived_from, list) and len(was_derived_from) > 1: 

912 is_merge_snapshot = True 

913 

914 if is_merge_snapshot: 

915 # Regex to find URI after "merged with", potentially enclosed in single quotes or none 

916 match = re.search(r"merged with ['‘]?([^'’<>\s]+)['’]?", description) 

917 if match: 

918 potential_merged_uri = match.group(1) 

919 if validators.url(potential_merged_uri): 

920 merged_entity_uri_from_desc = potential_merged_uri 

921 merged_entity_label = None 

922 if current_index > 0: 

923 previous_snapshot_timestamp = sorted_timestamps[current_index - 1] 

924 previous_snapshot_graph = history.get(entity_uri, {}).get(previous_snapshot_timestamp) 

925 if previous_snapshot_graph: 

926 raw_merged_entity_classes = [ 

927 str(o) 

928 for s, p, o in previous_snapshot_graph.triples( 

929 (URIRef(merged_entity_uri_from_desc), RDF.type, None) 

930 ) 

931 ] 

932 highest_priority_merged_class = get_highest_priority_class( 

933 raw_merged_entity_classes 

934 ) if raw_merged_entity_classes else None 

935 merged_entity_classes_for_label = ( 

936 [highest_priority_merged_class] 

937 if highest_priority_merged_class 

938 else [] 

939 ) 

940 merged_entity_label = custom_filter.human_readable_entity( 

941 merged_entity_uri_from_desc, 

942 merged_entity_classes_for_label, 

943 previous_snapshot_graph, 

944 ) 

945 if ( 

946 merged_entity_label 

947 and merged_entity_label != merged_entity_uri_from_desc 

948 ): 

949 description = description.replace( 

950 match.group(0), f"merged with '{merged_entity_label}'" 

951 ) 

952 

953 entity_label_for_desc = custom_filter.human_readable_entity( 

954 entity_uri, entity_classes, context_snapshot 

955 ) 

956 if entity_label_for_desc and entity_label_for_desc != entity_uri: 

957 description = description.replace(f"'{entity_uri}'", f"'{entity_label_for_desc}'") 

958 

959 return description 

960 

961 

962@entity_bp.route("/entity-version/<path:entity_uri>/<timestamp>") 

963@login_required 

964def entity_version(entity_uri, timestamp): 

965 """ 

966 Display a specific version of an entity. 

967 

968 Args: 

969 entity_uri: URI of the entity 

970 timestamp: Timestamp of the version to display 

971 """ 

972 custom_filter = get_custom_filter() 

973 form_fields = get_form_fields() 

974 change_tracking_config = get_change_tracking_config() 

975 

976 try: 

977 timestamp_dt = datetime.fromisoformat(timestamp) 

978 except ValueError: 

979 # Try to get timestamp from provenance graph 

980 provenance_sparql = get_provenance_sparql() 

981 query_timestamp = f""" 

982 SELECT ?generation_time 

983 WHERE {{ 

984 <{entity_uri}/prov/se/{timestamp}> <http://www.w3.org/ns/prov#generatedAtTime> ?generation_time. 

985 }} 

986 """ 

987 provenance_sparql.setQuery(query_timestamp) 

988 provenance_sparql.setReturnFormat(JSON) 

989 try: 

990 generation_time = provenance_sparql.queryAndConvert()["results"][ 

991 "bindings" 

992 ][0]["generation_time"]["value"] 

993 except IndexError: 

994 abort(404) 

995 timestamp = generation_time 

996 timestamp_dt = datetime.fromisoformat(generation_time) 

997 

998 # Get entity history 

999 agnostic_entity = AgnosticEntity( 

1000 res=entity_uri, config=change_tracking_config, related_entities_history=True 

1001 ) 

1002 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

1003 

1004 # Find closest snapshot 

1005 main_entity_history = history.get(entity_uri, {}) 

1006 sorted_timestamps = sorted( 

1007 main_entity_history.keys(), key=lambda t: convert_to_datetime(t) 

1008 ) 

1009 

1010 if not sorted_timestamps: 

1011 abort(404) 

1012 

1013 closest_timestamp = min( 

1014 sorted_timestamps, 

1015 key=lambda t: abs( 

1016 convert_to_datetime(t).astimezone() - timestamp_dt.astimezone() 

1017 ), 

1018 ) 

1019 

1020 version = main_entity_history[closest_timestamp] 

1021 triples = list(version.triples((URIRef(entity_uri), None, None))) 

1022 

1023 # Get metadata 

1024 entity_metadata = provenance.get(entity_uri, {}) 

1025 closest_metadata = None 

1026 min_time_diff = None 

1027 

1028 latest_timestamp = max(sorted_timestamps) 

1029 latest_metadata = None 

1030 

1031 for se_uri, meta in entity_metadata.items(): 

1032 meta_time = convert_to_datetime(meta["generatedAtTime"]) 

1033 time_diff = abs((meta_time - timestamp_dt).total_seconds()) 

1034 

1035 if closest_metadata is None or time_diff < min_time_diff: 

1036 closest_metadata = meta 

1037 min_time_diff = time_diff 

1038 

1039 if meta["generatedAtTime"] == latest_timestamp: 

1040 latest_metadata = meta 

1041 

1042 if closest_metadata is None or latest_metadata is None: 

1043 abort(404) 

1044 

1045 # Check if this is a deletion snapshot 

1046 is_deletion_snapshot = ( 

1047 closest_timestamp == latest_timestamp 

1048 and "invalidatedAtTime" in latest_metadata 

1049 and latest_metadata["invalidatedAtTime"] 

1050 ) or len(triples) == 0 

1051 

1052 # Use appropriate snapshot for context 

1053 context_version = version 

1054 if is_deletion_snapshot and len(sorted_timestamps) > 1: 

1055 current_index = sorted_timestamps.index(closest_timestamp) 

1056 if current_index > 0: 

1057 context_version = main_entity_history[sorted_timestamps[current_index - 1]] 

1058 

1059 # Get subject classes 

1060 if is_deletion_snapshot and len(sorted_timestamps) > 1: 

1061 subject_classes = [ 

1062 o 

1063 for _, _, o in context_version.triples((URIRef(entity_uri), RDF.type, None)) 

1064 ] 

1065 else: 

1066 subject_classes = [ 

1067 o for _, _, o in version.triples((URIRef(entity_uri), RDF.type, None)) 

1068 ] 

1069 

1070 subject_classes = [get_highest_priority_class(subject_classes)] 

1071 

1072 # Process and group triples 

1073 _, _, _, _, _, _, valid_predicates = get_valid_predicates(triples) 

1074 grouped_triples, relevant_properties = get_grouped_triples( 

1075 entity_uri, 

1076 triples, 

1077 subject_classes, 

1078 valid_predicates, 

1079 historical_snapshot=context_version, 

1080 ) 

1081 

1082 # Calculate version number 

1083 snapshot_times = [ 

1084 convert_to_datetime(meta["generatedAtTime"]) 

1085 for meta in entity_metadata.values() 

1086 ] 

1087 snapshot_times = sorted(set(snapshot_times)) 

1088 version_number = snapshot_times.index(timestamp_dt) + 1 

1089 

1090 # Find next and previous snapshots 

1091 next_snapshot_timestamp = None 

1092 prev_snapshot_timestamp = None 

1093 

1094 for snap_time in snapshot_times: 

1095 if snap_time > timestamp_dt: 

1096 next_snapshot_timestamp = snap_time.isoformat() 

1097 break 

1098 

1099 for snap_time in reversed(snapshot_times): 

1100 if snap_time < timestamp_dt: 

1101 prev_snapshot_timestamp = snap_time.isoformat() 

1102 break 

1103 

1104 # Generate modification text if update query exists 

1105 modifications = "" 

1106 if closest_metadata.get("hasUpdateQuery"): 

1107 sparql_query = closest_metadata["hasUpdateQuery"] 

1108 parsed_modifications = parse_sparql_update(sparql_query) 

1109 modifications = generate_modification_text( 

1110 parsed_modifications, 

1111 subject_classes, 

1112 history=history, 

1113 entity_uri=entity_uri, 

1114 current_snapshot=version, 

1115 current_snapshot_timestamp=closest_timestamp, 

1116 custom_filter=custom_filter, 

1117 form_fields=form_fields, 

1118 ) 

1119 

1120 try: 

1121 current_index = sorted_timestamps.index(closest_timestamp) 

1122 except ValueError: 

1123 current_index = -1 

1124 

1125 if closest_metadata.get("description"): 

1126 formatted_description = _format_snapshot_description( 

1127 closest_metadata, 

1128 entity_uri, 

1129 subject_classes, 

1130 context_version, 

1131 history, 

1132 sorted_timestamps, 

1133 current_index, 

1134 custom_filter, 

1135 ) 

1136 closest_metadata["description"] = formatted_description 

1137 

1138 closest_timestamp = closest_metadata["generatedAtTime"] 

1139 

1140 return render_template( 

1141 "entity/version.jinja", 

1142 subject=entity_uri, 

1143 metadata={closest_timestamp: closest_metadata}, 

1144 timestamp=closest_timestamp, 

1145 next_snapshot_timestamp=next_snapshot_timestamp, 

1146 prev_snapshot_timestamp=prev_snapshot_timestamp, 

1147 modifications=modifications, 

1148 grouped_triples=grouped_triples, 

1149 subject_classes=subject_classes, 

1150 version_number=version_number, 

1151 version=context_version, 

1152 ) 

1153 

1154 

1155@entity_bp.route("/restore-version/<path:entity_uri>/<timestamp>", methods=["POST"]) 

1156@login_required 

1157def restore_version(entity_uri, timestamp): 

1158 """ 

1159 Restore an entity to a previous version. 

1160 

1161 Args: 

1162 entity_uri: URI of the entity to restore 

1163 timestamp: Timestamp of the version to restore to 

1164 """ 

1165 timestamp = convert_to_datetime(timestamp, stringify=True) 

1166 change_tracking_config = get_change_tracking_config() 

1167 

1168 # Get entity history 

1169 agnostic_entity = AgnosticEntity( 

1170 res=entity_uri, config=change_tracking_config, related_entities_history=True 

1171 ) 

1172 history, provenance = agnostic_entity.get_history(include_prov_metadata=True) 

1173 

1174 historical_graph = history.get(entity_uri, {}).get(timestamp) 

1175 if historical_graph is None: 

1176 abort(404) 

1177 

1178 current_graph = fetch_current_state_with_related_entities(provenance) 

1179 

1180 is_deleted = len(list(current_graph.triples((URIRef(entity_uri), None, None)))) == 0 

1181 

1182 triples_or_quads_to_delete, triples_or_quads_to_add = compute_graph_differences( 

1183 current_graph, historical_graph 

1184 ) 

1185 

1186 # Get all entities that need restoration 

1187 entities_to_restore = get_entities_to_restore( 

1188 triples_or_quads_to_delete, triples_or_quads_to_add, entity_uri 

1189 ) 

1190 

1191 # Prepare snapshot information for all entities 

1192 entity_snapshots = prepare_entity_snapshots( 

1193 entities_to_restore, provenance, timestamp 

1194 ) 

1195 

1196 # Create editor instance 

1197 editor = Editor( 

1198 get_dataset_endpoint(), 

1199 get_provenance_endpoint(), 

1200 current_app.config["COUNTER_HANDLER"], 

1201 URIRef(f"https://orcid.org/{current_user.orcid}"), 

1202 None if is_deleted else entity_snapshots[entity_uri]["source"], 

1203 current_app.config["DATASET_GENERATION_TIME"], 

1204 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

1205 ) 

1206 

1207 # Import current state into editor 

1208 if get_dataset_is_quadstore(): 

1209 for quad in current_graph.quads(): 

1210 editor.g_set.add(quad) 

1211 else: 

1212 for triple in current_graph: 

1213 editor.g_set.add(triple) 

1214 

1215 editor.preexisting_finished() 

1216 

1217 # Apply deletions 

1218 for item in triples_or_quads_to_delete: 

1219 if len(item) == 4: 

1220 editor.delete(item[0], item[1], item[2], item[3]) 

1221 else: 

1222 editor.delete(item[0], item[1], item[2]) 

1223 

1224 subject = str(item[0]) 

1225 if subject in entity_snapshots: 

1226 entity_info = entity_snapshots[subject] 

1227 if entity_info["needs_restore"]: 

1228 editor.g_set.mark_as_restored(URIRef(subject)) 

1229 editor.g_set.entity_index[URIRef(subject)]["restoration_source"] = ( 

1230 entity_info["source"] 

1231 ) 

1232 

1233 # Apply additions 

1234 for item in triples_or_quads_to_add: 

1235 if len(item) == 4: 

1236 editor.create(item[0], item[1], item[2], item[3]) 

1237 else: 

1238 editor.create(item[0], item[1], item[2]) 

1239 

1240 subject = str(item[0]) 

1241 if subject in entity_snapshots: 

1242 entity_info = entity_snapshots[subject] 

1243 if entity_info["needs_restore"]: 

1244 editor.g_set.mark_as_restored(URIRef(subject)) 

1245 editor.g_set.entity_index[URIRef(subject)]["source"] = entity_info[ 

1246 "source" 

1247 ] 

1248 

1249 # Handle main entity restoration if needed 

1250 if is_deleted and entity_uri in entity_snapshots: 

1251 editor.g_set.mark_as_restored(URIRef(entity_uri)) 

1252 source = entity_snapshots[entity_uri]["source"] 

1253 editor.g_set.entity_index[URIRef(entity_uri)]["source"] = source 

1254 

1255 try: 

1256 editor.save() 

1257 flash(gettext("Version restored successfully"), "success") 

1258 except Exception as e: 

1259 flash( 

1260 gettext( 

1261 "An error occurred while restoring the version: %(error)s", error=str(e) 

1262 ), 

1263 "error", 

1264 ) 

1265 

1266 return redirect(url_for("entity.about", subject=entity_uri)) 

1267 

1268 

1269def compute_graph_differences( 

1270 current_graph: Graph | ConjunctiveGraph, historical_graph: Graph | ConjunctiveGraph 

1271): 

1272 if get_dataset_is_quadstore(): 

1273 current_data = set(current_graph.quads()) 

1274 historical_data = set(historical_graph.quads()) 

1275 else: 

1276 current_data = set(current_graph.triples((None, None, None))) 

1277 historical_data = set(historical_graph.triples((None, None, None))) 

1278 triples_or_quads_to_delete = current_data - historical_data 

1279 triples_or_quads_to_add = historical_data - current_data 

1280 

1281 return triples_or_quads_to_delete, triples_or_quads_to_add 

1282 

1283 

1284def get_entities_to_restore( 

1285 triples_or_quads_to_delete: set, triples_or_quads_to_add: set, main_entity_uri: str 

1286) -> set: 

1287 """ 

1288 Identify all entities that need to be restored based on the graph differences. 

1289 

1290 Args: 

1291 triples_or_quads_to_delete: Set of triples/quads to be deleted 

1292 triples_or_quads_to_add: Set of triples/quads to be added 

1293 main_entity_uri: URI of the main entity being restored 

1294 

1295 Returns: 

1296 Set of entity URIs that need to be restored 

1297 """ 

1298 entities_to_restore = {main_entity_uri} 

1299 

1300 for item in list(triples_or_quads_to_delete) + list(triples_or_quads_to_add): 

1301 predicate = str(item[1]) 

1302 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": 

1303 continue 

1304 

1305 subject = str(item[0]) 

1306 obj = str(item[2]) 

1307 for uri in [subject, obj]: 

1308 if uri != main_entity_uri and validators.url(uri): 

1309 entities_to_restore.add(uri) 

1310 

1311 return entities_to_restore 

1312 

1313 

1314def prepare_entity_snapshots( 

1315 entities_to_restore: set, provenance: dict, target_time: str 

1316) -> dict: 

1317 """ 

1318 Prepare snapshot information for all entities that need to be restored. 

1319 

1320 Args: 

1321 entities_to_restore: Set of entity URIs to process 

1322 provenance: Dictionary containing provenance data for all entities 

1323 target_time: Target restoration time 

1324 

1325 Returns: 

1326 Dictionary mapping entity URIs to their restoration information 

1327 """ 

1328 entity_snapshots = {} 

1329 

1330 for entity_uri in entities_to_restore: 

1331 if entity_uri not in provenance: 

1332 continue 

1333 

1334 # Find the appropriate source snapshot 

1335 source_snapshot = find_appropriate_snapshot(provenance[entity_uri], target_time) 

1336 if not source_snapshot: 

1337 continue 

1338 

1339 # Check if entity is currently deleted by examining its latest snapshot 

1340 sorted_snapshots = sorted( 

1341 provenance[entity_uri].items(), 

1342 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]), 

1343 ) 

1344 latest_snapshot = sorted_snapshots[-1][1] 

1345 is_deleted = ( 

1346 latest_snapshot.get("invalidatedAtTime") 

1347 and latest_snapshot["generatedAtTime"] 

1348 == latest_snapshot["invalidatedAtTime"] 

1349 ) 

1350 

1351 entity_snapshots[entity_uri] = { 

1352 "source": source_snapshot, 

1353 "needs_restore": is_deleted, 

1354 } 

1355 

1356 return entity_snapshots 

1357 

1358 

1359def find_appropriate_snapshot(provenance_data: dict, target_time: str) -> Optional[str]: 

1360 """ 

1361 Find the most appropriate snapshot to use as a source for restoration. 

1362 

1363 Args: 

1364 provenance_data: Dictionary of snapshots and their metadata for an entity 

1365 target_time: The target restoration time as ISO format string 

1366 

1367 Returns: 

1368 The URI of the most appropriate snapshot, or None if no suitable snapshot is found 

1369 """ 

1370 target_datetime = convert_to_datetime(target_time) 

1371 

1372 # Convert all generation times to datetime for comparison 

1373 valid_snapshots = [] 

1374 for snapshot_uri, metadata in provenance_data.items(): 

1375 generation_time = convert_to_datetime(metadata["generatedAtTime"]) 

1376 

1377 # Skip deletion snapshots (where generation time equals invalidation time) 

1378 if ( 

1379 metadata.get("invalidatedAtTime") 

1380 and metadata["generatedAtTime"] == metadata["invalidatedAtTime"] 

1381 ): 

1382 continue 

1383 

1384 # Only consider snapshots up to our target time 

1385 if generation_time <= target_datetime: 

1386 valid_snapshots.append((generation_time, snapshot_uri)) 

1387 

1388 if not valid_snapshots: 

1389 return None 

1390 

1391 # Sort by generation time and take the most recent one 

1392 valid_snapshots.sort(key=lambda x: x[0]) 

1393 return valid_snapshots[-1][1] 

1394 

1395 

1396def get_inverse_references(subject_uri: str) -> List[Dict]: 

1397 """ 

1398 Get all entities that reference this entity. 

1399 

1400 Args: 

1401 subject_uri: URI of the entity to find references to 

1402 

1403 Returns: 

1404 List of dictionaries containing reference information 

1405 """ 

1406 sparql = get_sparql() 

1407 custom_filter = get_custom_filter() 

1408 

1409 # Build appropriate query based on triplestore type 

1410 if is_virtuoso: 

1411 query = f""" 

1412 SELECT DISTINCT ?s ?p ?g WHERE {{ 

1413 GRAPH ?g {{ 

1414 ?s ?p <{subject_uri}> . 

1415 }} 

1416 FILTER(?g NOT IN (<{'>, <'.join(VIRTUOSO_EXCLUDED_GRAPHS)}>)) 

1417 FILTER(?p != <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>) 

1418 }} 

1419 """ 

1420 else: 

1421 query = f""" 

1422 SELECT DISTINCT ?s ?p WHERE {{ 

1423 ?s ?p <{subject_uri}> . 

1424 FILTER(?p != <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>) 

1425 }} 

1426 """ 

1427 

1428 sparql.setQuery(query) 

1429 sparql.setReturnFormat(JSON) 

1430 results = sparql.query().convert() 

1431 

1432 references = [] 

1433 for result in results["results"]["bindings"]: 

1434 subject = result["s"]["value"] 

1435 predicate = result["p"]["value"] 

1436 

1437 # Get the type of the referring entity 

1438 type_query = f""" 

1439 SELECT ?type WHERE {{ 

1440 <{subject}> a ?type . 

1441 }} 

1442 """ 

1443 sparql.setQuery(type_query) 

1444 type_results = sparql.query().convert() 

1445 types = [t["type"]["value"] for t in type_results["results"]["bindings"]] 

1446 types = [get_highest_priority_class(types)] 

1447 

1448 references.append({"subject": subject, "predicate": predicate, "types": types}) 

1449 

1450 return references 

1451 

1452 

1453def generate_modification_text( 

1454 modifications, 

1455 subject_classes, 

1456 history, 

1457 entity_uri, 

1458 current_snapshot, 

1459 current_snapshot_timestamp, 

1460 custom_filter: Filter, 

1461 form_fields, 

1462): 

1463 """ 

1464 Generate HTML text describing modifications to an entity, using display rules for property ordering. 

1465 

1466 Args: 

1467 modifications (dict): Dictionary of modifications from parse_sparql_update 

1468 subject_classes (list): List of classes for the subject entity 

1469 history (dict): Historical snapshots dictionary 

1470 entity_uri (str): URI of the entity being modified 

1471 current_snapshot (Graph): Current entity snapshot 

1472 current_snapshot_timestamp (str): Timestamp of current snapshot 

1473 custom_filter (Filter): Filter instance for formatting 

1474 form_fields (dict): Form fields configuration from SHACL 

1475 """ 

1476 modification_text = "<p><strong>" + gettext("Modifications") + "</strong></p>" 

1477 

1478 # Get display rules and property order 

1479 display_rules = get_display_rules() 

1480 ordered_properties = get_property_order_from_rules(subject_classes, display_rules) 

1481 

1482 for mod_type, triples in modifications.items(): 

1483 modification_text += "<ul class='list-group mb-3'><p>" 

1484 if mod_type == gettext("Additions"): 

1485 modification_text += '<i class="bi bi-plus-circle-fill text-success"></i>' 

1486 elif mod_type == gettext("Deletions"): 

1487 modification_text += '<i class="bi bi-dash-circle-fill text-danger"></i>' 

1488 modification_text += " <em>" + gettext(mod_type) + "</em></p>" 

1489 

1490 # Group triples by predicate 

1491 predicate_groups = {} 

1492 for triple in triples: 

1493 predicate = str(triple[1]) 

1494 if predicate not in predicate_groups: 

1495 predicate_groups[predicate] = [] 

1496 predicate_groups[predicate].append(triple) 

1497 

1498 # Process predicates in order from display rules 

1499 processed_predicates = set() 

1500 

1501 # First handle predicates that are in the ordered list 

1502 for predicate in ordered_properties: 

1503 if predicate in predicate_groups: 

1504 processed_predicates.add(predicate) 

1505 for triple in predicate_groups[predicate]: 

1506 modification_text += format_triple_modification( 

1507 triple, 

1508 subject_classes, 

1509 mod_type, 

1510 history, 

1511 entity_uri, 

1512 current_snapshot, 

1513 current_snapshot_timestamp, 

1514 custom_filter, 

1515 form_fields, 

1516 ) 

1517 

1518 # Then handle any remaining predicates not in the ordered list 

1519 for predicate, triples in predicate_groups.items(): 

1520 if predicate not in processed_predicates: 

1521 for triple in triples: 

1522 modification_text += format_triple_modification( 

1523 triple, 

1524 subject_classes, 

1525 mod_type, 

1526 history, 

1527 entity_uri, 

1528 current_snapshot, 

1529 current_snapshot_timestamp, 

1530 custom_filter, 

1531 form_fields, 

1532 ) 

1533 

1534 modification_text += "</ul>" 

1535 

1536 return modification_text 

1537 

1538 

1539def format_triple_modification( 

1540 triple, 

1541 subject_classes, 

1542 mod_type, 

1543 history, 

1544 entity_uri, 

1545 current_snapshot, 

1546 current_snapshot_timestamp, 

1547 custom_filter: Filter, 

1548 form_fields, 

1549): 

1550 """ 

1551 Format a single triple modification as HTML. 

1552 

1553 Args: 

1554 triple: The RDF triple being modified 

1555 subject_classes: List of classes for the subject entity 

1556 mod_type: Type of modification (addition/deletion) 

1557 history: Historical snapshots dictionary 

1558 entity_uri: URI of the entity being modified 

1559 current_snapshot: Current entity snapshot 

1560 current_snapshot_timestamp: Timestamp of current snapshot 

1561 custom_filter: Filter instance for formatting 

1562 form_fields: Form fields configuration from SHACL 

1563 

1564 Returns: 

1565 HTML string representing the triple modification """ 

1566 predicate = triple[1] 

1567 predicate_label = custom_filter.human_readable_predicate(predicate, subject_classes) 

1568 object_value = triple[2] 

1569 

1570 # Determine which snapshot to use for context 

1571 relevant_snapshot = None 

1572 if ( 

1573 mod_type == gettext("Deletions") 

1574 and history 

1575 and entity_uri 

1576 and current_snapshot_timestamp 

1577 ): 

1578 sorted_timestamps = sorted(history[entity_uri].keys()) 

1579 current_index = sorted_timestamps.index(current_snapshot_timestamp) 

1580 if current_index > 0: 

1581 relevant_snapshot = history[entity_uri][ 

1582 sorted_timestamps[current_index - 1] 

1583 ] 

1584 else: 

1585 relevant_snapshot = current_snapshot 

1586 

1587 subject_class = get_highest_priority_class(subject_classes) 

1588 

1589 object_label = get_object_label( 

1590 object_value, 

1591 predicate, 

1592 subject_class, 

1593 form_fields, 

1594 relevant_snapshot, 

1595 custom_filter, 

1596 ) 

1597 

1598 return f""" 

1599 <li class='d-flex align-items-center'> 

1600 <span class='flex-grow-1 d-flex flex-column justify-content-center ms-3 mb-2 w-100'> 

1601 <strong>{predicate_label}</strong> 

1602 <span class="object-value word-wrap">{object_label}</span> 

1603 </span> 

1604 </li>""" 

1605 

1606 

1607def get_object_label( 

1608 object_value: str, 

1609 predicate: str, 

1610 entity_type: str, 

1611 form_fields: dict, 

1612 snapshot: Optional[Graph], 

1613 custom_filter: Filter, 

1614) -> str: 

1615 """ 

1616 Get appropriate display label for an object value based on form fields configuration. 

1617 

1618 Args: 

1619 object_value: The value to get a label for 

1620 predicate: The predicate URI 

1621 entity_type: The type of the entity 

1622 form_fields: Form fields configuration from SHACL 

1623 snapshot: Optional graph snapshot for context 

1624 custom_filter: Custom filter instance for formatting 

1625 

1626 Returns: 

1627 A human-readable label for the object value 

1628 """ 

1629 entity_type = str(entity_type) 

1630 predicate = str(predicate) 

1631 

1632 # Handle RDF type predicates 

1633 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": 

1634 return custom_filter.human_readable_predicate( 

1635 object_value, [entity_type] 

1636 ).title() 

1637 

1638 if form_fields and entity_type in form_fields: 

1639 predicate_fields = form_fields[entity_type].get(predicate, []) 

1640 for field in predicate_fields: 

1641 # Check if this is an entity reference 

1642 if field.get("nodeShape") or field.get("objectClass"): 

1643 if validators.url(object_value): 

1644 # Get types for the referenced entity 

1645 object_classes = [] 

1646 if snapshot: 

1647 object_classes = [ 

1648 str(o) 

1649 for s, p, o in snapshot.triples( 

1650 (URIRef(object_value), RDF.type, None) 

1651 ) 

1652 ] 

1653 

1654 if not object_classes and field.get("objectClass"): 

1655 object_classes = [field["objectClass"]] 

1656 

1657 return custom_filter.human_readable_entity( 

1658 object_value, object_classes, snapshot 

1659 ) 

1660 

1661 # Check for mandatory values 

1662 if field.get("hasValue") == object_value: 

1663 return custom_filter.human_readable_predicate( 

1664 object_value, [entity_type] 

1665 ) 

1666 

1667 # Check for optional values from a predefined set 

1668 if object_value in field.get("optionalValues", []): 

1669 return custom_filter.human_readable_predicate( 

1670 object_value, [entity_type] 

1671 ) 

1672 

1673 # Default to simple string representation for literal values 

1674 if not validators.url(object_value): 

1675 return object_value 

1676 

1677 # For any other URIs, use human_readable_predicate 

1678 return custom_filter.human_readable_predicate(object_value, [entity_type]) 

1679 

1680 

1681def process_modification_data(data: dict) -> Tuple[str, List[dict]]: 

1682 """ 

1683 Process modification data to extract subjects and predicates. 

1684 

1685 Args: 

1686 data: Dictionary containing modification data 

1687 

1688 Returns: 

1689 Tuple containing subject URI and list of modification details 

1690 """ 

1691 subject_uri = data.get("subject") 

1692 if not subject_uri: 

1693 raise ValueError("No subject URI provided in modification data") 

1694 

1695 modifications = data.get("modifications", []) 

1696 if not modifications: 

1697 raise ValueError("No modifications provided in data") 

1698 

1699 return subject_uri, modifications 

1700 

1701 

1702def validate_modification( 

1703 modification: dict, subject_uri: str, form_fields: dict 

1704) -> Tuple[bool, str]: 

1705 """ 

1706 Validate a single modification operation. 

1707 

1708 Args: 

1709 modification: Dictionary containing modification details 

1710 subject_uri: URI of the subject being modified 

1711 form_fields: Form fields configuration from SHACL 

1712 

1713 Returns: 

1714 Tuple of (is_valid, error_message) 

1715 """ 

1716 operation = modification.get("operation") 

1717 if not operation: 

1718 return False, "No operation specified in modification" 

1719 

1720 predicate = modification.get("predicate") 

1721 if not predicate: 

1722 return False, "No predicate specified in modification" 

1723 

1724 if operation not in ["add", "remove", "update"]: 

1725 return False, f"Invalid operation: {operation}" 

1726 

1727 # Additional validation based on form fields if available 

1728 if form_fields: 

1729 entity_types = [str(t) for t in get_entity_types(subject_uri)] 

1730 entity_type = get_highest_priority_class(entity_types) 

1731 

1732 if entity_type in form_fields: 

1733 predicate_fields = form_fields[entity_type].get(predicate, []) 

1734 

1735 for field in predicate_fields: 

1736 if operation == "remove" and field.get("minCount", 0) > 0: 

1737 return False, f"Cannot remove required predicate: {predicate}" 

1738 

1739 if operation == "add": 

1740 current_count = get_predicate_count(subject_uri, predicate) 

1741 max_count = field.get("maxCount") 

1742 

1743 if max_count and current_count >= max_count: 

1744 return ( 

1745 False, 

1746 f"Maximum count exceeded for predicate: {predicate}", 

1747 ) 

1748 

1749 return True, "" 

1750 

1751 

1752def get_predicate_count(subject_uri: str, predicate: str) -> int: 

1753 """ 

1754 Get the current count of values for a predicate. 

1755 

1756 Args: 

1757 subject_uri: URI of the entity 

1758 predicate: Predicate URI to count 

1759 

1760 Returns: 

1761 Number of values for the predicate 

1762 """ 

1763 sparql = get_sparql() 

1764 

1765 query = f""" 

1766 SELECT (COUNT(?o) as ?count) WHERE {{ 

1767 <{subject_uri}> <{predicate}> ?o . 

1768 }} 

1769 """ 

1770 

1771 sparql.setQuery(query) 

1772 sparql.setReturnFormat(JSON) 

1773 results = sparql.query().convert() 

1774 

1775 return int(results["results"]["bindings"][0]["count"]["value"]) 

1776 

1777 

1778def apply_modifications( 

1779 editor: Editor, 

1780 modifications: List[dict], 

1781 subject_uri: str, 

1782 graph_uri: Optional[str] = None, 

1783): 

1784 """ 

1785 Apply a list of modifications to an entity. 

1786 

1787 Args: 

1788 editor: Editor instance to use for modifications 

1789 modifications: List of modification operations 

1790 subject_uri: URI of the entity being modified 

1791 graph_uri: Optional graph URI for quad store 

1792 """ 

1793 for mod in modifications: 

1794 operation = mod["operation"] 

1795 predicate = mod["predicate"] 

1796 

1797 if operation == "remove": 

1798 editor.delete(URIRef(subject_uri), URIRef(predicate), graph_uri=graph_uri) 

1799 

1800 elif operation == "add": 

1801 value = mod["value"] 

1802 datatype = mod.get("datatype", XSD.string) 

1803 

1804 if validators.url(value): 

1805 object_value = URIRef(value) 

1806 else: 

1807 object_value = Literal(value, datatype=URIRef(datatype)) 

1808 

1809 editor.create( 

1810 URIRef(subject_uri), URIRef(predicate), object_value, graph_uri 

1811 ) 

1812 

1813 elif operation == "update": 

1814 old_value = mod["oldValue"] 

1815 new_value = mod["newValue"] 

1816 datatype = mod.get("datatype", XSD.string) 

1817 

1818 if validators.url(old_value): 

1819 old_object = URIRef(old_value) 

1820 else: 

1821 old_object = Literal(old_value, datatype=URIRef(datatype)) 

1822 

1823 if validators.url(new_value): 

1824 new_object = URIRef(new_value) 

1825 else: 

1826 new_object = Literal(new_value, datatype=URIRef(datatype)) 

1827 

1828 editor.update( 

1829 URIRef(subject_uri), 

1830 URIRef(predicate), 

1831 old_object, 

1832 new_object, 

1833 graph_uri, 

1834 )