Coverage for heritrace / routes / entity.py: 89%
783 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import json
6import re
7from datetime import datetime
8from typing import List, Optional, Tuple
10import validators
11from flask import (Blueprint, abort, current_app, flash, jsonify, redirect,
12 render_template, request, url_for)
13from flask_babel import gettext
14from flask_login import current_user, login_required
15from rdflib import RDF, XSD, Dataset, Graph, Literal, URIRef
16from SPARQLWrapper import JSON
17from time_agnostic_library.agnostic_entity import AgnosticEntity
19from heritrace.apis.orcid import get_responsible_agent_uri
20from heritrace.editor import Editor
21from heritrace.extensions import (get_change_tracking_config,
22 get_custom_filter, get_dataset_endpoint,
23 get_dataset_is_quadstore, get_display_rules,
24 get_form_fields, get_provenance_endpoint,
25 get_provenance_sparql, get_shacl_graph,
26 get_sparql)
27from heritrace.forms import *
28from heritrace.utils.converters import convert_to_datetime
29from heritrace.utils.datatypes import DATATYPE_MAPPING, get_datatype_options
30from heritrace.utils.display_rules_utils import (
31 get_class_priority, get_grouped_triples, get_highest_priority_class,
32 get_predicate_ordering_info, get_property_order_from_rules,
33 get_shape_order_from_display_rules, is_entity_type_visible)
34from heritrace.utils.filters import Filter
35from heritrace.utils.primary_source_utils import (
36 get_default_primary_source, save_user_default_primary_source)
37from heritrace.utils.shacl_utils import (determine_shape_for_entity_triples,
38 find_matching_form_field,
39 get_entity_position_in_sequence)
40from heritrace.utils.shacl_validation import get_valid_predicates
41from heritrace.utils.sparql_utils import (
42 convert_to_rdflib_graphs, determine_shape_for_classes,
43 fetch_current_state_with_related_entities, fetch_data_graph_for_subject,
44 get_entity_types, get_triples_from_graph, import_referenced_entities,
45 parse_sparql_update)
46from heritrace.utils.uri_utils import generate_unique_uri
47from heritrace.utils.virtual_properties import \
48 get_virtual_properties_for_entity, \
49 transform_entity_creation_with_virtual_properties, \
50 remove_virtual_properties_from_creation_data
52def _prepare_entity_creation_data(structured_data):
53 """
54 Prepare entity creation data by removing virtual properties and extracting core fields.
56 Returns:
57 Tuple of (cleaned_structured_data, entity_type, properties, entity_uri)
58 """
59 cleaned_structured_data = remove_virtual_properties_from_creation_data(structured_data)
60 entity_type = cleaned_structured_data.get("entity_type")
61 properties = cleaned_structured_data.get("properties", {})
62 entity_uri = generate_unique_uri(entity_type)
64 return cleaned_structured_data, entity_type, properties, entity_uri
67def _setup_editor_for_creation(editor, cleaned_structured_data):
68 """
69 Setup editor for entity creation with referenced entities and preprocessing.
70 """
71 import_referenced_entities(editor, cleaned_structured_data)
72 editor.preexisting_finished()
75def _process_virtual_properties_after_creation(editor, structured_data, entity_uri, default_graph_uri):
76 """
77 Process virtual properties after main entity creation.
78 """
79 virtual_entities = transform_entity_creation_with_virtual_properties(structured_data, str(entity_uri))
81 if virtual_entities:
82 for virtual_entity in virtual_entities:
83 virtual_entity_uri = generate_unique_uri(virtual_entity.get("entity_type"))
84 create_nested_entity(editor, virtual_entity_uri, virtual_entity, default_graph_uri)
86 # Save the virtual entities
87 editor.save()
90entity_bp = Blueprint("entity", __name__)
93def get_deleted_entity_context_info(is_deleted: bool, sorted_timestamps: List[str],
94 history: dict, subject: str) -> Tuple[Optional[Graph], Optional[str], Optional[str]]:
95 """
96 Extract context information for deleted entities with multiple timestamps.
98 When an entity is deleted but has multiple timestamps in its history,
99 this function retrieves the context snapshot from the second-to-last timestamp
100 and determines the entity's highest priority class and shape.
102 Args:
103 is_deleted: Whether the entity is deleted
104 sorted_timestamps: List of timestamps in chronological order
105 history: Dictionary mapping subject -> timestamp -> Graph
106 subject: The entity URI as string
108 Returns:
109 Tuple of (context_snapshot, highest_priority_class, entity_shape)
110 Returns (None, None, None) if conditions are not met
111 """
112 if is_deleted and len(sorted_timestamps) > 1:
113 context_snapshot = history[subject][sorted_timestamps[-2]]
115 subject_classes = [
116 o
117 for _, _, o in get_triples_from_graph(
118 context_snapshot, (URIRef(subject), RDF.type, None)
119 )
120 ]
122 highest_priority_class = get_highest_priority_class(subject_classes)
123 entity_shape = determine_shape_for_entity_triples(
124 list(get_triples_from_graph(context_snapshot, (URIRef(subject), None, None)))
125 )
127 return context_snapshot, highest_priority_class, entity_shape
128 else:
129 return None, None, None
132@entity_bp.route("/about/<path:subject>")
133@login_required
134def about(subject):
135 """
136 Display detailed information about an entity.
138 Args:
139 subject: URI of the entity to display
140 """
141 change_tracking_config = get_change_tracking_config()
143 default_primary_source = get_default_primary_source(current_user.orcid)
145 agnostic_entity = AgnosticEntity(
146 res=subject, config=change_tracking_config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False
147 )
148 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
149 history = convert_to_rdflib_graphs(history, get_dataset_is_quadstore())
151 is_deleted = False
152 context_snapshot = None
153 subject_classes = []
154 highest_priority_class = None
155 entity_shape = None
157 if history.get(subject):
158 sorted_timestamps = sorted(history[subject].keys())
159 latest_metadata = next(
160 (
161 meta
162 for _, meta in provenance[subject].items()
163 if meta["generatedAtTime"] == sorted_timestamps[-1]
164 ),
165 None,
166 )
168 is_deleted = (
169 latest_metadata
170 and "invalidatedAtTime" in latest_metadata
171 and latest_metadata["invalidatedAtTime"]
172 )
174 context_snapshot, highest_priority_class, entity_shape = get_deleted_entity_context_info(
175 is_deleted, sorted_timestamps, history, subject
176 )
178 grouped_triples = {}
179 can_be_added = []
180 can_be_deleted = []
181 datatypes = {}
182 mandatory_values = {}
183 optional_values = {}
184 valid_predicates = []
185 data_graph = None
187 if not is_deleted:
188 data_graph = fetch_data_graph_for_subject(subject)
190 # Check if entity exists - if no history and no data_graph, entity doesn't exist
191 if not history.get(subject) and (not data_graph or len(data_graph) == 0):
192 abort(404)
194 if data_graph:
195 # Use helper function to handle both Graph and Dataset correctly
196 triples = list(get_triples_from_graph(data_graph, (None, None, None)))
197 subject_classes = [o for s, p, o in get_triples_from_graph(data_graph, (URIRef(subject), RDF.type, None))]
198 subject_triples = list(get_triples_from_graph(data_graph, (URIRef(subject), None, None)))
200 highest_priority_class = get_highest_priority_class(subject_classes)
201 entity_shape = determine_shape_for_entity_triples(subject_triples)
203 (
204 can_be_added,
205 can_be_deleted,
206 datatypes,
207 mandatory_values,
208 optional_values,
209 valid_predicates,
210 ) = get_valid_predicates(triples, highest_priority_class=highest_priority_class)
212 grouped_triples, relevant_properties = get_grouped_triples(
213 subject, triples, valid_predicates, highest_priority_class=highest_priority_class, highest_priority_shape=entity_shape
214 )
216 virtual_properties = get_virtual_properties_for_entity(highest_priority_class, entity_shape)
218 can_be_added = [uri for uri in can_be_added if uri in relevant_properties] + [vp[0] for vp in virtual_properties]
219 can_be_deleted = [
220 uri for uri in can_be_deleted if uri in relevant_properties
221 ] + [vp[0] for vp in virtual_properties]
223 update_form = UpdateTripleForm()
225 form_fields = get_form_fields()
227 datatype_options = get_datatype_options()
229 predicate_details_map = {}
230 for entity_type_key, predicates in form_fields.items():
231 for predicate_uri, details_list in predicates.items():
232 for details in details_list:
233 shape = details.get("nodeShape")
234 key = (predicate_uri, entity_type_key, shape)
235 predicate_details_map[key] = details
237 return render_template(
238 "entity/about.jinja",
239 subject=subject,
240 history=history,
241 can_be_added=can_be_added,
242 can_be_deleted=can_be_deleted,
243 datatypes=datatypes,
244 update_form=update_form,
245 mandatory_values=mandatory_values,
246 optional_values=optional_values,
247 shacl=bool(len(get_shacl_graph())),
248 grouped_triples=grouped_triples,
249 display_rules=get_display_rules(),
250 form_fields=form_fields,
251 entity_type=highest_priority_class,
252 entity_shape=entity_shape,
253 predicate_details_map=predicate_details_map,
254 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"],
255 dataset_db_text_index_enabled=current_app.config[
256 "DATASET_DB_TEXT_INDEX_ENABLED"
257 ],
258 is_deleted=is_deleted,
259 context=context_snapshot,
260 default_primary_source=default_primary_source,
261 datatype_options=datatype_options,
262 )
265@entity_bp.route("/create-entity", methods=["GET", "POST"])
266@login_required
267def create_entity():
268 """
269 Create a new entity in the dataset.
270 """
271 form_fields = get_form_fields()
273 default_primary_source = get_default_primary_source(current_user.orcid)
275 entity_class_shape_pairs = sorted(
276 [
277 entity_key
278 for entity_key in form_fields.keys()
279 if is_entity_type_visible(entity_key)
280 ],
281 key=lambda et: get_class_priority(et),
282 reverse=True,
283 )
285 datatype_options = get_datatype_options()
287 if request.method == "POST":
288 structured_data = json.loads(request.form.get("structured_data", "{}"))
289 primary_source = request.form.get("primary_source") or None
290 save_default_source = request.form.get("save_default_source") == 'true'
292 if primary_source and not validators.url(primary_source):
293 return jsonify({"status": "error", "errors": [gettext("Invalid primary source URL provided")]}), 400
295 if save_default_source and primary_source and validators.url(primary_source):
296 save_user_default_primary_source(current_user.orcid, primary_source)
298 editor = Editor(
299 get_dataset_endpoint(),
300 get_provenance_endpoint(),
301 current_app.config["COUNTER_HANDLER"],
302 URIRef(get_responsible_agent_uri(current_user.orcid)),
303 primary_source,
304 current_app.config["DATASET_GENERATION_TIME"],
305 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"],
306 )
308 if not structured_data.get("entity_type"):
309 return jsonify({"status": "error", "errors": [gettext("Entity type is required")]}), 400
311 # Prepare common data for entity creation
312 cleaned_structured_data, entity_type, properties, entity_uri = _prepare_entity_creation_data(structured_data)
314 default_graph_uri = (
315 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None
316 )
318 if form_fields:
319 validation_errors = validate_entity_data(cleaned_structured_data)
320 if validation_errors:
321 return jsonify({"status": "error", "errors": validation_errors}), 400
323 _setup_editor_for_creation(editor, cleaned_structured_data)
325 for predicate, values in properties.items():
326 if not isinstance(values, list):
327 values = [values]
329 entity_shape = cleaned_structured_data.get("entity_shape")
330 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
332 field_definitions = form_fields.get(matching_key, {}).get(predicate, []) if matching_key else []
334 # Get the shape from the property value if available
335 property_shape = None
336 if values and isinstance(values[0], dict):
337 property_shape = values[0].get("shape")
339 # Filter field definitions to find the matching one based on shape
340 matching_field_def = None
341 for field_def in field_definitions:
342 if property_shape:
343 # If property has a shape, match it with the field definition's subjectShape
344 if field_def.get("subjectShape") == property_shape:
345 matching_field_def = field_def
346 break
347 else:
348 # If no shape specified, use the first field definition without a shape requirement
349 if not field_def.get("subjectShape"):
350 matching_field_def = field_def
351 break
353 # If no matching field definition found, use the first one (default behavior)
354 if not matching_field_def and field_definitions:
355 matching_field_def = field_definitions[0]
357 ordered_by = (
358 matching_field_def.get("orderedBy") if matching_field_def else None
359 )
361 if ordered_by:
362 process_ordered_properties(
363 editor, entity_uri, predicate, values, default_graph_uri, ordered_by
364 )
365 else:
366 # Handle unordered properties
367 process_unordered_properties(
368 editor, entity_uri, predicate, values, default_graph_uri, matching_field_def
369 )
370 else:
371 editor.import_entity(entity_uri)
372 _setup_editor_for_creation(editor, cleaned_structured_data)
374 editor.create(
375 entity_uri,
376 RDF.type,
377 URIRef(entity_type),
378 default_graph_uri,
379 )
381 for predicate, values in properties.items():
382 for value_dict in values:
383 if value_dict["type"] == "uri":
384 editor.create(
385 entity_uri,
386 URIRef(predicate),
387 URIRef(value_dict["value"]),
388 default_graph_uri,
389 )
390 elif value_dict["type"] == "literal":
391 datatype = (
392 URIRef(value_dict["datatype"])
393 if "datatype" in value_dict
394 else XSD.string
395 )
396 editor.create(
397 entity_uri,
398 URIRef(predicate),
399 Literal(value_dict["value"], datatype=datatype),
400 default_graph_uri,
401 )
403 try:
404 # Save the main entity first
405 editor.save()
407 # Process virtual properties after creation
408 _process_virtual_properties_after_creation(editor, structured_data, entity_uri, default_graph_uri)
410 response = jsonify(
411 {
412 "status": "success",
413 "redirect_url": url_for("entity.about", subject=str(entity_uri)),
414 }
415 )
416 flash(gettext("Entity created successfully"), "success")
417 return response, 200
418 except Exception as e:
419 error_message = gettext(
420 "An error occurred while creating the entity: %(error)s", error=str(e)
421 )
422 return jsonify({"status": "error", "errors": [error_message]}), 500
424 return render_template(
425 "create_entity.jinja",
426 datatype_options=datatype_options,
427 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"],
428 dataset_db_text_index_enabled=current_app.config[
429 "DATASET_DB_TEXT_INDEX_ENABLED"
430 ],
431 default_primary_source=default_primary_source,
432 shacl=bool(form_fields),
433 entity_class_shape_pairs=entity_class_shape_pairs
434 )
437def create_nested_entity(
438 editor: Editor, entity_uri, entity_data, graph_uri=None
439):
440 form_fields = get_form_fields()
442 editor.create(
443 entity_uri,
444 URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
445 URIRef(entity_data["entity_type"]),
446 graph_uri,
447 )
449 entity_type = entity_data.get("entity_type")
450 entity_shape = entity_data.get("entity_shape")
451 properties = entity_data.get("properties", {})
453 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
455 if not matching_key:
456 return
458 # Add other properties
459 for predicate, values in properties.items():
460 if not isinstance(values, list):
461 values = [values]
462 field_definitions = form_fields[matching_key].get(predicate, [])
464 for value in values:
465 if isinstance(value, dict) and "entity_type" in value:
466 if "intermediateRelation" in value:
467 intermediate_uri = generate_unique_uri(
468 value["intermediateRelation"]["class"]
469 )
470 target_uri = generate_unique_uri(value["entity_type"])
471 editor.create(
472 entity_uri, URIRef(predicate), intermediate_uri, graph_uri
473 )
474 editor.create(
475 intermediate_uri,
476 URIRef(value["intermediateRelation"]["property"]),
477 target_uri,
478 graph_uri,
479 )
480 create_nested_entity(
481 editor, target_uri, value, graph_uri
482 )
483 else:
484 # Handle nested entities
485 nested_uri = generate_unique_uri(value["entity_type"])
486 editor.create(entity_uri, URIRef(predicate), nested_uri, graph_uri)
487 create_nested_entity(
488 editor, nested_uri, value, graph_uri
489 )
490 elif isinstance(value, dict) and value.get("is_existing_entity", False):
491 existing_entity_uri = value.get("entity_uri")
492 if existing_entity_uri:
493 editor.create(entity_uri, URIRef(predicate), URIRef(existing_entity_uri), graph_uri)
494 else:
495 # Handle simple properties - check if it's a URI or literal
496 if validators.url(str(value)):
497 object_value = URIRef(value)
498 else:
499 datatype = XSD.string # Default to string if not specified
500 datatype_uris = []
501 if field_definitions:
502 datatype_uris = field_definitions[0].get("datatypes", [])
503 datatype = determine_datatype(value, datatype_uris)
504 object_value = Literal(value, datatype=datatype)
505 editor.create(entity_uri, URIRef(predicate), object_value, graph_uri)
508def process_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri, matching_field_def):
509 """
510 Process a single entity value, handling nested entities, existing entity references, and simple literals.
512 Args:
513 editor: Editor instance for RDF operations
514 entity_uri: URI of the parent entity
515 predicate: Predicate URI
516 value: Value to process (dict or primitive)
517 default_graph_uri: Default graph URI for quad stores
518 matching_field_def: Field definition for datatype validation
520 Returns:
521 URIRef: The URI of the created/referenced entity
522 """
523 if isinstance(value, dict) and "entity_type" in value:
524 nested_uri = generate_unique_uri(value["entity_type"])
525 editor.create(
526 entity_uri,
527 URIRef(predicate),
528 nested_uri,
529 default_graph_uri,
530 )
531 create_nested_entity(
532 editor,
533 nested_uri,
534 value,
535 default_graph_uri
536 )
537 return nested_uri
538 elif isinstance(value, dict) and value.get("is_existing_entity", False):
539 entity_ref_uri = value.get("entity_uri")
540 if entity_ref_uri:
541 object_value = URIRef(entity_ref_uri)
542 editor.create(
543 entity_uri,
544 URIRef(predicate),
545 object_value,
546 default_graph_uri,
547 )
548 return object_value
549 else:
550 raise ValueError("Missing entity_uri in existing entity reference")
551 else:
552 # Handle simple properties - check if it's a URI or literal
553 if validators.url(str(value)):
554 object_value = URIRef(value)
555 else:
556 datatype_uris = []
557 if matching_field_def:
558 datatype_uris = matching_field_def.get("datatypes", [])
559 datatype = determine_datatype(value, datatype_uris)
560 object_value = Literal(value, datatype=datatype)
561 editor.create(
562 entity_uri,
563 URIRef(predicate),
564 object_value,
565 default_graph_uri,
566 )
567 return object_value
570def process_ordered_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri):
571 """
572 Process a single entity value for ordered properties.
574 Args:
575 editor: Editor instance for RDF operations
576 entity_uri: URI of the parent entity
577 predicate: Predicate URI
578 value: Value to process (dict)
579 default_graph_uri: Default graph URI for quad stores
581 Returns:
582 URIRef: The URI of the created/referenced entity
583 """
584 if isinstance(value, dict) and "entity_type" in value:
585 nested_uri = generate_unique_uri(value["entity_type"])
586 editor.create(
587 entity_uri,
588 URIRef(predicate),
589 nested_uri,
590 default_graph_uri,
591 )
592 create_nested_entity(
593 editor,
594 nested_uri,
595 value,
596 default_graph_uri
597 )
598 return nested_uri
599 elif isinstance(value, dict) and value.get("is_existing_entity", False):
600 # If it's a direct URI value (reference to existing entity)
601 nested_uri = URIRef(value)
602 editor.create(
603 entity_uri,
604 URIRef(predicate),
605 nested_uri,
606 default_graph_uri,
607 )
608 return nested_uri
609 else:
610 raise ValueError("Unexpected value type for ordered property")
613def process_ordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, ordered_by):
614 """
615 Process ordered properties by grouping values by shape and maintaining order.
617 Args:
618 editor: Editor instance for RDF operations
619 entity_uri: URI of the parent entity
620 predicate: Predicate URI
621 values: List of values to process
622 default_graph_uri: Default graph URI for quad stores
623 ordered_by: URI of the ordering property
624 """
625 values_by_shape = {}
626 for value in values:
627 shape = value.get("entity_shape")
628 if not shape:
629 shape = "default_shape"
630 if shape not in values_by_shape:
631 values_by_shape[shape] = []
632 values_by_shape[shape].append(value)
634 for shape, shape_values in values_by_shape.items():
635 previous_entity = None
636 for value in shape_values:
637 nested_uri = process_ordered_entity_value(
638 editor, entity_uri, predicate, value, default_graph_uri
639 )
641 if previous_entity:
642 editor.create(
643 previous_entity,
644 URIRef(ordered_by),
645 nested_uri,
646 default_graph_uri,
647 )
648 previous_entity = nested_uri
651def process_unordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, matching_field_def):
652 """
653 Process unordered properties.
655 Args:
656 editor: Editor instance for RDF operations
657 entity_uri: URI of the parent entity
658 predicate: Predicate URI
659 values: List of values to process
660 default_graph_uri: Default graph URI for quad stores
661 matching_field_def: Field definition for datatype validation
662 """
663 for value in values:
664 process_entity_value(
665 editor, entity_uri, predicate, value, default_graph_uri, matching_field_def
666 )
669def determine_datatype(value, datatype_uris):
670 for datatype_uri in datatype_uris:
671 validation_func = next(
672 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None
673 )
674 if validation_func and validation_func(value):
675 return URIRef(datatype_uri)
676 # If none match, default to XSD.string
677 return XSD.string
680def validate_entity_data(structured_data):
681 """
682 Validates entity data against form field definitions, considering shape matching.
684 Args:
685 structured_data (dict): Data to validate containing entity_type and properties
687 Returns:
688 list: List of validation error messages, empty if validation passes
689 """
690 custom_filter = get_custom_filter()
691 form_fields = get_form_fields()
693 errors = []
694 entity_type = structured_data.get("entity_type")
695 entity_shape = structured_data.get("entity_shape")
697 if not entity_type:
698 errors.append(gettext("Entity type is required"))
699 return errors
701 entity_key = find_matching_form_field(entity_type, entity_shape, form_fields)
703 if not entity_key:
704 errors.append(f"No form fields found for entity type: {entity_type}" +
705 (f" and shape: {entity_shape}" if entity_shape else ""))
706 return errors
708 entity_fields = form_fields[entity_key]
709 properties = structured_data.get("properties", {})
711 for prop_uri, prop_values in properties.items():
712 if URIRef(prop_uri) == RDF.type:
713 continue
715 field_definitions = entity_fields.get(prop_uri)
716 if not field_definitions:
717 errors.append(
718 gettext(
719 "Unknown property %(prop_uri)s for entity type %(entity_type)s",
720 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
721 entity_type=custom_filter.human_readable_class(entity_key),
722 )
723 )
724 continue
726 if not isinstance(prop_values, list):
727 prop_values = [prop_values]
729 property_shape = None
730 if prop_values and isinstance(prop_values[0], dict):
731 property_shape = prop_values[0].get("shape")
733 matching_field_def = None
734 for field_def in field_definitions:
735 if property_shape:
736 if field_def.get("subjectShape") == property_shape:
737 matching_field_def = field_def
738 break
739 else:
740 if not field_def.get("subjectShape"):
741 matching_field_def = field_def
742 break
744 if not matching_field_def and field_definitions:
745 matching_field_def = field_definitions[0]
747 if matching_field_def:
748 min_count = matching_field_def.get("min", 0)
749 max_count = matching_field_def.get("max", None)
750 value_count = len(prop_values)
752 if value_count < min_count:
753 value = gettext("values") if min_count > 1 else gettext("value")
754 errors.append(
755 gettext(
756 "Property %(prop_uri)s requires at least %(min_count)d %(value)s",
757 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
758 min_count=min_count,
759 value=value,
760 )
761 )
762 if max_count is not None and value_count > max_count:
763 value = gettext("values") if max_count > 1 else gettext("value")
764 errors.append(
765 gettext(
766 "Property %(prop_uri)s allows at most %(max_count)d %(value)s",
767 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
768 max_count=max_count,
769 value=value,
770 )
771 )
773 mandatory_values = matching_field_def.get("mandatory_values", [])
774 for mandatory_value in mandatory_values:
775 if mandatory_value not in prop_values:
776 errors.append(
777 gettext(
778 "Property %(prop_uri)s requires the value %(mandatory_value)s",
779 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
780 mandatory_value=mandatory_value,
781 )
782 )
784 for value in prop_values:
785 if isinstance(value, dict) and "entity_type" in value:
786 nested_errors = validate_entity_data(value)
787 errors.extend(nested_errors)
788 else:
789 datatypes = matching_field_def.get("datatypes", [])
790 if datatypes:
791 is_valid_datatype = False
792 for dtype in datatypes:
793 validation_func = next(
794 (
795 d[1]
796 for d in DATATYPE_MAPPING
797 if d[0] == URIRef(dtype)
798 ),
799 None,
800 )
801 if validation_func and validation_func(value):
802 is_valid_datatype = True
803 break
804 if not is_valid_datatype:
805 expected_types = ", ".join(
806 [
807 custom_filter.human_readable_predicate(dtype, entity_key)
808 for dtype in datatypes
809 ]
810 )
811 errors.append(
812 gettext(
813 'Value "%(value)s" for property %(prop_uri)s is not of expected type %(expected_types)s',
814 value=value,
815 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
816 expected_types=expected_types
817 )
818 )
820 optional_values = matching_field_def.get("optionalValues", [])
821 if optional_values and value not in optional_values:
822 acceptable_values = ", ".join(
823 [
824 custom_filter.human_readable_predicate(val, entity_key)
825 for val in optional_values
826 ]
827 )
828 errors.append(
829 gettext(
830 'Value "%(value)s" is not permitted for property %(prop_uri)s. Acceptable values are: %(acceptable_values)s',
831 value=value,
832 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
833 acceptable_values=acceptable_values
834 )
835 )
837 # In the RDF model, a property with zero values is equivalent to the property being absent,
838 # as a triple requires a subject, predicate, and object. Therefore, this section checks for
839 # properties defined in the schema that are completely absent from the input data but are
840 # required (min_count > 0). This complements the cardinality check above, which only
841 # validates properties that are present in the data.
842 # Check for missing required properties
843 for prop_uri, field_definitions in entity_fields.items():
844 if prop_uri not in properties:
845 for field_def in field_definitions:
846 min_count = field_def.get("min", 0)
847 if min_count > 0:
848 value = gettext("values") if min_count > 1 else gettext("value")
849 errors.append(
850 gettext(
851 "Missing required property: %(prop_uri)s requires at least %(min_count)d %(value)s",
852 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
853 min_count=min_count,
854 value=value,
855 )
856 )
857 break # Only need to report once per property
859 return errors
862@entity_bp.route("/entity-history/<path:entity_uri>")
863@login_required
864def entity_history(entity_uri):
865 """
866 Display the history of changes for an entity.
868 Args:
869 entity_uri: URI of the entity
870 """
871 custom_filter = get_custom_filter()
872 change_tracking_config = get_change_tracking_config()
874 agnostic_entity = AgnosticEntity(
875 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True
876 )
877 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
878 history = convert_to_rdflib_graphs(history, get_dataset_is_quadstore())
880 sorted_metadata = sorted(
881 provenance[entity_uri].items(),
882 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]),
883 )
884 sorted_timestamps = [
885 convert_to_datetime(meta["generatedAtTime"], stringify=True)
886 for _, meta in sorted_metadata
887 ]
889 # Get correct context for entity label
890 latest_metadata = sorted_metadata[-1][1] if sorted_metadata else None
891 is_latest_deletion = (
892 latest_metadata
893 and "invalidatedAtTime" in latest_metadata
894 and latest_metadata["invalidatedAtTime"]
895 )
896 if is_latest_deletion and len(sorted_timestamps) > 1:
897 context_snapshot = history[entity_uri][sorted_timestamps[-2]]
898 else:
899 context_snapshot = history[entity_uri][sorted_timestamps[-1]]
901 entity_classes = [str(triple[2]) for triple in get_triples_from_graph(context_snapshot, (URIRef(entity_uri), RDF.type, None))]
902 highest_priority_class = get_highest_priority_class(entity_classes)
904 snapshot_entity_shape = determine_shape_for_entity_triples(
905 list(get_triples_from_graph(context_snapshot, (URIRef(entity_uri), None, None)))
906 )
908 # Generate timeline events
909 events = []
910 for i, (snapshot_uri, metadata) in enumerate(sorted_metadata):
911 date = convert_to_datetime(metadata["generatedAtTime"])
912 snapshot_timestamp_str = convert_to_datetime(
913 metadata["generatedAtTime"], stringify=True
914 )
915 snapshot_graph = history[entity_uri][snapshot_timestamp_str]
917 responsible_agent = custom_filter.format_agent_reference(
918 metadata["wasAttributedTo"]
919 )
920 primary_source = custom_filter.format_source_reference(
921 metadata["hadPrimarySource"]
922 )
924 description = _format_snapshot_description(
925 metadata,
926 entity_uri,
927 highest_priority_class,
928 context_snapshot,
929 history,
930 sorted_timestamps,
931 i,
932 custom_filter,
933 )
934 modifications = metadata.get("hasUpdateQuery", "")
935 modification_text = ""
936 if modifications:
937 parsed_modifications = parse_sparql_update(modifications)
938 modification_text = generate_modification_text(
939 parsed_modifications,
940 highest_priority_class,
941 snapshot_entity_shape,
942 history=history,
943 entity_uri=entity_uri,
944 current_snapshot=snapshot_graph,
945 current_snapshot_timestamp=snapshot_timestamp_str,
946 custom_filter=custom_filter,
947 )
949 # Check if this version can be restored (not the latest version and there are multiple versions)
950 can_restore = len(sorted_metadata) > 1 and i + 1 < len(sorted_metadata)
951 restore_button = ""
952 if can_restore:
953 restore_button = f"""
954 <form action='/restore-version/{entity_uri}/{metadata["generatedAtTime"]}' method='post' class='d-inline restore-form'>
955 <button type='submit' class='btn btn-success restore-btn'>
956 <i class='bi bi-arrow-counterclockwise me-1'></i>{gettext('Restore')}
957 </button>
958 </form>
959 """
961 event = {
962 "start_date": {
963 "year": date.year,
964 "month": date.month,
965 "day": date.day,
966 "hour": date.hour,
967 "minute": date.minute,
968 "second": date.second,
969 },
970 "text": {
971 "headline": gettext("Snapshot") + " " + str(i + 1),
972 "text": f"""
973 <p><strong>{gettext('Responsible agent')}:</strong> {responsible_agent}</p>
974 <p><strong>{gettext('Primary source')}:</strong> {primary_source}</p>
975 <p><strong>{gettext('Description')}:</strong> {description}</p>
976 <div class="modifications mb-3">
977 {modification_text}
978 </div>
979 <div class="d-flex gap-2 mt-2">
980 <a href='/entity-version/{entity_uri}/{metadata["generatedAtTime"]}' class='btn btn-outline-primary view-version' target='_self'>{gettext('View version')}</a>
981 {restore_button}
982 </div>
983 """,
984 },
985 "autolink": False,
986 }
988 if i + 1 < len(sorted_metadata):
989 next_date = convert_to_datetime(
990 sorted_metadata[i + 1][1]["generatedAtTime"]
991 )
992 event["end_date"] = {
993 "year": next_date.year,
994 "month": next_date.month,
995 "day": next_date.day,
996 "hour": next_date.hour,
997 "minute": next_date.minute,
998 "second": next_date.second,
999 }
1001 events.append(event)
1003 entity_label = custom_filter.human_readable_entity(
1004 entity_uri, (highest_priority_class, snapshot_entity_shape), context_snapshot
1005 )
1007 timeline_data = {
1008 "entityUri": entity_uri,
1009 "entityLabel": entity_label,
1010 "entityClasses": list(entity_classes),
1011 "entityShape": snapshot_entity_shape,
1012 "events": events,
1013 }
1015 return render_template("entity/history.jinja", timeline_data=timeline_data)
1018def _format_snapshot_description(
1019 metadata: dict,
1020 entity_uri: str,
1021 highest_priority_class: str,
1022 context_snapshot: Graph,
1023 history: dict,
1024 sorted_timestamps: list[str],
1025 current_index: int,
1026 custom_filter: Filter,
1027) -> Tuple[str, bool]:
1028 """
1029 Formats the snapshot description and determines if it's a merge snapshot.
1031 Args:
1032 metadata: The snapshot metadata dictionary.
1033 entity_uri: The URI of the main entity.
1034 highest_priority_class: The highest priority class for the entity.
1035 context_snapshot: The graph snapshot for context.
1036 history: The history dictionary containing snapshots.
1037 sorted_timestamps: Sorted list of snapshot timestamps.
1038 current_index: The index of the current snapshot in sorted_timestamps.
1039 custom_filter: The custom filter instance for formatting.
1041 Returns:
1042 The formatted description string.
1043 """
1044 description = metadata.get("description", "")
1045 is_merge_snapshot = False
1046 was_derived_from = metadata.get('wasDerivedFrom')
1047 if isinstance(was_derived_from, list) and len(was_derived_from) > 1:
1048 is_merge_snapshot = True
1050 if is_merge_snapshot:
1051 # Regex to find URI after "merged with", potentially enclosed in single quotes or none
1052 match = re.search(r"merged with ['‘]?([^'’<>\s]+)['’]?", description)
1053 if match:
1054 potential_merged_uri = match.group(1)
1055 if validators.url(potential_merged_uri):
1056 merged_entity_uri_from_desc = potential_merged_uri
1057 merged_entity_label = None
1058 if current_index > 0:
1059 previous_snapshot_timestamp = sorted_timestamps[current_index - 1]
1060 previous_snapshot_graph = history.get(entity_uri, {}).get(previous_snapshot_timestamp)
1061 if previous_snapshot_graph:
1062 raw_merged_entity_classes = [
1063 str(o)
1064 for s, p, o in get_triples_from_graph(
1065 previous_snapshot_graph, (URIRef(merged_entity_uri_from_desc), RDF.type, None)
1066 )
1067 ]
1068 highest_priority_merged_class = get_highest_priority_class(
1069 raw_merged_entity_classes
1070 ) if raw_merged_entity_classes else None
1072 shape = determine_shape_for_classes(raw_merged_entity_classes)
1073 merged_entity_label = custom_filter.human_readable_entity(
1074 merged_entity_uri_from_desc,
1075 (highest_priority_merged_class, shape),
1076 previous_snapshot_graph,
1077 )
1078 if (
1079 merged_entity_label
1080 and merged_entity_label != merged_entity_uri_from_desc
1081 ):
1082 description = description.replace(
1083 match.group(0), f"merged with '{merged_entity_label}'"
1084 )
1086 shape = determine_shape_for_classes([highest_priority_class])
1087 entity_label_for_desc = custom_filter.human_readable_entity(
1088 entity_uri, (highest_priority_class, shape), context_snapshot
1089 )
1090 if entity_label_for_desc and entity_label_for_desc != entity_uri:
1091 description = description.replace(f"'{entity_uri}'", f"'{entity_label_for_desc}'")
1093 return description
1096@entity_bp.route("/entity-version/<path:entity_uri>/<timestamp>")
1097@login_required
1098def entity_version(entity_uri, timestamp):
1099 """
1100 Display a specific version of an entity.
1102 Args:
1103 entity_uri: URI of the entity
1104 timestamp: Timestamp of the version to display
1105 """
1106 custom_filter = get_custom_filter()
1107 change_tracking_config = get_change_tracking_config()
1109 try:
1110 timestamp_dt = datetime.fromisoformat(timestamp)
1111 except ValueError:
1112 provenance_sparql = get_provenance_sparql()
1113 query_timestamp = f"""
1114 SELECT ?generation_time
1115 WHERE {{
1116 <{entity_uri}/prov/se/{timestamp}> <http://www.w3.org/ns/prov#generatedAtTime> ?generation_time.
1117 }}
1118 """
1119 provenance_sparql.setQuery(query_timestamp)
1120 provenance_sparql.setReturnFormat(JSON)
1121 try:
1122 generation_time = provenance_sparql.queryAndConvert()["results"][
1123 "bindings"
1124 ][0]["generation_time"]["value"]
1125 except IndexError:
1126 abort(404)
1127 timestamp = generation_time
1128 timestamp_dt = datetime.fromisoformat(generation_time)
1130 agnostic_entity = AgnosticEntity(
1131 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True
1132 )
1133 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
1134 history = convert_to_rdflib_graphs(history, get_dataset_is_quadstore())
1135 main_entity_history = history.get(entity_uri, {})
1136 sorted_timestamps = sorted(
1137 main_entity_history.keys(), key=lambda t: convert_to_datetime(t)
1138 )
1140 if not sorted_timestamps:
1141 abort(404)
1143 closest_timestamp = min(
1144 sorted_timestamps,
1145 key=lambda t: abs(
1146 convert_to_datetime(t).astimezone() - timestamp_dt.astimezone()
1147 ),
1148 )
1150 version = main_entity_history[closest_timestamp]
1151 triples = list(get_triples_from_graph(version, (URIRef(entity_uri), None, None)))
1153 entity_metadata = provenance.get(entity_uri, {})
1154 closest_metadata = None
1155 min_time_diff = None
1157 latest_timestamp = max(sorted_timestamps)
1158 latest_metadata = None
1160 for se_uri, meta in entity_metadata.items():
1161 meta_time = convert_to_datetime(meta["generatedAtTime"])
1162 time_diff = abs((meta_time - timestamp_dt).total_seconds())
1164 if closest_metadata is None or time_diff < min_time_diff:
1165 closest_metadata = meta
1166 min_time_diff = time_diff
1168 if meta["generatedAtTime"] == latest_timestamp:
1169 latest_metadata = meta
1171 if closest_metadata is None or latest_metadata is None:
1172 abort(404)
1174 is_deletion_snapshot = (
1175 closest_timestamp == latest_timestamp
1176 and "invalidatedAtTime" in latest_metadata
1177 and latest_metadata["invalidatedAtTime"]
1178 ) or len(triples) == 0
1180 context_version = version
1181 if is_deletion_snapshot and len(sorted_timestamps) > 1:
1182 current_index = sorted_timestamps.index(closest_timestamp)
1183 if current_index > 0:
1184 context_version = main_entity_history[sorted_timestamps[current_index - 1]]
1186 if is_deletion_snapshot and len(sorted_timestamps) > 1:
1187 subject_classes = [
1188 o
1189 for _, _, o in get_triples_from_graph(context_version, (URIRef(entity_uri), RDF.type, None))
1190 ]
1191 else:
1192 subject_classes = [
1193 o for _, _, o in get_triples_from_graph(version, (URIRef(entity_uri), RDF.type, None))
1194 ]
1196 highest_priority_class = get_highest_priority_class(subject_classes)
1198 entity_shape = determine_shape_for_entity_triples(
1199 list(get_triples_from_graph(context_version, (URIRef(entity_uri), None, None)))
1200 )
1202 _, _, _, _, _, valid_predicates = get_valid_predicates(triples, highest_priority_class=highest_priority_class)
1204 grouped_triples, relevant_properties = get_grouped_triples(
1205 entity_uri,
1206 triples,
1207 valid_predicates,
1208 historical_snapshot=context_version,
1209 highest_priority_class=highest_priority_class,
1210 highest_priority_shape=entity_shape
1211 )
1213 snapshot_times = [
1214 convert_to_datetime(meta["generatedAtTime"])
1215 for meta in entity_metadata.values()
1216 ]
1217 snapshot_times = sorted(set(snapshot_times))
1218 version_number = snapshot_times.index(timestamp_dt) + 1
1220 next_snapshot_timestamp = None
1221 prev_snapshot_timestamp = None
1223 for snap_time in snapshot_times:
1224 if snap_time > timestamp_dt:
1225 next_snapshot_timestamp = snap_time.isoformat()
1226 break
1228 for snap_time in reversed(snapshot_times):
1229 if snap_time < timestamp_dt:
1230 prev_snapshot_timestamp = snap_time.isoformat()
1231 break
1233 modifications = ""
1234 if closest_metadata.get("hasUpdateQuery"):
1235 sparql_query = closest_metadata["hasUpdateQuery"]
1236 parsed_modifications = parse_sparql_update(sparql_query)
1237 modifications = generate_modification_text(
1238 parsed_modifications,
1239 highest_priority_class,
1240 entity_shape,
1241 history,
1242 entity_uri,
1243 context_version,
1244 closest_timestamp,
1245 custom_filter,
1246 )
1248 try:
1249 current_index = sorted_timestamps.index(closest_timestamp)
1250 except ValueError:
1251 current_index = -1
1253 if closest_metadata.get("description"):
1254 formatted_description = _format_snapshot_description(
1255 closest_metadata,
1256 entity_uri,
1257 highest_priority_class,
1258 context_version,
1259 history,
1260 sorted_timestamps,
1261 current_index,
1262 custom_filter,
1263 )
1264 closest_metadata["description"] = formatted_description
1266 closest_timestamp = closest_metadata["generatedAtTime"]
1268 return render_template(
1269 "entity/version.jinja",
1270 subject=entity_uri,
1271 entity_type=highest_priority_class,
1272 entity_shape=entity_shape,
1273 metadata={closest_timestamp: closest_metadata},
1274 timestamp=closest_timestamp,
1275 next_snapshot_timestamp=next_snapshot_timestamp,
1276 prev_snapshot_timestamp=prev_snapshot_timestamp,
1277 modifications=modifications,
1278 grouped_triples=grouped_triples,
1279 version_number=version_number,
1280 version=context_version,
1281 )
1284@entity_bp.route("/restore-version/<path:entity_uri>/<timestamp>", methods=["POST"])
1285@login_required
1286def restore_version(entity_uri, timestamp):
1287 """
1288 Restore an entity to a previous version.
1290 Args:
1291 entity_uri: URI of the entity to restore
1292 timestamp: Timestamp of the version to restore to
1293 """
1294 timestamp = convert_to_datetime(timestamp, stringify=True)
1295 change_tracking_config = get_change_tracking_config()
1297 # Get entity history
1298 agnostic_entity = AgnosticEntity(
1299 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True
1300 )
1301 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
1302 history = convert_to_rdflib_graphs(history, get_dataset_is_quadstore())
1304 historical_graph = history.get(entity_uri, {}).get(timestamp)
1305 if historical_graph is None:
1306 abort(404)
1308 current_graph = fetch_current_state_with_related_entities(provenance)
1310 is_deleted = len(list(get_triples_from_graph(current_graph, (URIRef(entity_uri), None, None)))) == 0
1312 triples_or_quads_to_delete, triples_or_quads_to_add = compute_graph_differences(
1313 current_graph, historical_graph
1314 )
1316 # Get all entities that need restoration
1317 entities_to_restore = get_entities_to_restore(
1318 triples_or_quads_to_delete, triples_or_quads_to_add, entity_uri
1319 )
1321 # Prepare snapshot information for all entities
1322 entity_snapshots = prepare_entity_snapshots(
1323 entities_to_restore, provenance, timestamp
1324 )
1326 # Create editor instance
1327 editor = Editor(
1328 get_dataset_endpoint(),
1329 get_provenance_endpoint(),
1330 current_app.config["COUNTER_HANDLER"],
1331 URIRef(get_responsible_agent_uri(current_user.orcid)),
1332 None if is_deleted else entity_snapshots[entity_uri]["source"],
1333 current_app.config["DATASET_GENERATION_TIME"],
1334 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"],
1335 )
1337 # Import current state into editor
1338 if get_dataset_is_quadstore():
1339 for quad in current_graph.quads():
1340 editor.g_set.add(quad)
1341 else:
1342 for triple in current_graph:
1343 editor.g_set.add(triple)
1345 editor.preexisting_finished()
1347 # Apply deletions
1348 for item in triples_or_quads_to_delete:
1349 if len(item) == 4:
1350 editor.delete(item[0], item[1], item[2], item[3])
1351 else:
1352 editor.delete(item[0], item[1], item[2])
1354 subject = str(item[0])
1355 if subject in entity_snapshots:
1356 entity_info = entity_snapshots[subject]
1357 if entity_info["needs_restore"]:
1358 editor.g_set.mark_as_restored(URIRef(subject))
1359 editor.g_set.entity_index[URIRef(subject)]["restoration_source"] = (
1360 entity_info["source"]
1361 )
1363 # Apply additions
1364 for item in triples_or_quads_to_add:
1365 if len(item) == 4:
1366 editor.create(item[0], item[1], item[2], item[3])
1367 else:
1368 editor.create(item[0], item[1], item[2])
1370 subject = str(item[0])
1371 if subject in entity_snapshots:
1372 entity_info = entity_snapshots[subject]
1373 if entity_info["needs_restore"]:
1374 editor.g_set.mark_as_restored(URIRef(subject))
1375 editor.g_set.entity_index[URIRef(subject)]["source"] = entity_info[
1376 "source"
1377 ]
1379 # Handle main entity restoration if needed
1380 if is_deleted and entity_uri in entity_snapshots:
1381 editor.g_set.mark_as_restored(URIRef(entity_uri))
1382 source = entity_snapshots[entity_uri]["source"]
1383 editor.g_set.entity_index[URIRef(entity_uri)]["source"] = source
1385 try:
1386 editor.save()
1387 flash(gettext("Version restored successfully"), "success")
1388 except Exception as e:
1389 flash(
1390 gettext(
1391 "An error occurred while restoring the version: %(error)s", error=str(e)
1392 ),
1393 "error",
1394 )
1396 return redirect(url_for("entity.about", subject=entity_uri))
1399def compute_graph_differences(
1400 current_graph: Graph | Dataset, historical_graph: Graph | Dataset
1401):
1402 if get_dataset_is_quadstore():
1403 current_data = set(current_graph.quads())
1404 historical_data = set(historical_graph.quads())
1405 else:
1406 current_data = set(get_triples_from_graph(current_graph, (None, None, None)))
1407 historical_data = set(get_triples_from_graph(historical_graph, (None, None, None)))
1408 triples_or_quads_to_delete = current_data - historical_data
1409 triples_or_quads_to_add = historical_data - current_data
1411 return triples_or_quads_to_delete, triples_or_quads_to_add
1414def get_entities_to_restore(
1415 triples_or_quads_to_delete: set, triples_or_quads_to_add: set, main_entity_uri: str
1416) -> set:
1417 """
1418 Identify all entities that need to be restored based on the graph differences.
1420 Args:
1421 triples_or_quads_to_delete: Set of triples/quads to be deleted
1422 triples_or_quads_to_add: Set of triples/quads to be added
1423 main_entity_uri: URI of the main entity being restored
1425 Returns:
1426 Set of entity URIs that need to be restored
1427 """
1428 entities_to_restore = {main_entity_uri}
1430 for item in list(triples_or_quads_to_delete) + list(triples_or_quads_to_add):
1431 predicate = str(item[1])
1432 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
1433 continue
1435 subject = str(item[0])
1436 obj = str(item[2])
1437 for uri in [subject, obj]:
1438 if uri != main_entity_uri and validators.url(uri):
1439 entities_to_restore.add(uri)
1441 return entities_to_restore
1444def prepare_entity_snapshots(
1445 entities_to_restore: set, provenance: dict, target_time: str
1446) -> dict:
1447 """
1448 Prepare snapshot information for all entities that need to be restored.
1450 Args:
1451 entities_to_restore: Set of entity URIs to process
1452 provenance: Dictionary containing provenance data for all entities
1453 target_time: Target restoration time
1455 Returns:
1456 Dictionary mapping entity URIs to their restoration information
1457 """
1458 entity_snapshots = {}
1460 for entity_uri in entities_to_restore:
1461 if entity_uri not in provenance:
1462 continue
1464 # Find the appropriate source snapshot
1465 source_snapshot = find_appropriate_snapshot(provenance[entity_uri], target_time)
1466 if not source_snapshot:
1467 continue
1469 # Check if entity is currently deleted by examining its latest snapshot
1470 sorted_snapshots = sorted(
1471 provenance[entity_uri].items(),
1472 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]),
1473 )
1474 latest_snapshot = sorted_snapshots[-1][1]
1475 is_deleted = (
1476 latest_snapshot.get("invalidatedAtTime")
1477 and latest_snapshot["generatedAtTime"]
1478 == latest_snapshot["invalidatedAtTime"]
1479 )
1481 entity_snapshots[entity_uri] = {
1482 "source": source_snapshot,
1483 "needs_restore": is_deleted,
1484 }
1486 return entity_snapshots
1489def find_appropriate_snapshot(provenance_data: dict, target_time: str) -> Optional[str]:
1490 """
1491 Find the most appropriate snapshot to use as a source for restoration.
1493 Args:
1494 provenance_data: Dictionary of snapshots and their metadata for an entity
1495 target_time: The target restoration time as ISO format string
1497 Returns:
1498 The URI of the most appropriate snapshot, or None if no suitable snapshot is found
1499 """
1500 target_datetime = convert_to_datetime(target_time)
1502 # Convert all generation times to datetime for comparison
1503 valid_snapshots = []
1504 for snapshot_uri, metadata in provenance_data.items():
1505 generation_time = convert_to_datetime(metadata["generatedAtTime"])
1507 # Skip deletion snapshots (where generation time equals invalidation time)
1508 if (
1509 metadata.get("invalidatedAtTime")
1510 and metadata["generatedAtTime"] == metadata["invalidatedAtTime"]
1511 ):
1512 continue
1514 # Only consider snapshots up to our target time
1515 if generation_time <= target_datetime:
1516 valid_snapshots.append((generation_time, snapshot_uri))
1518 if not valid_snapshots:
1519 return None
1521 # Sort by generation time and take the most recent one
1522 valid_snapshots.sort(key=lambda x: x[0])
1523 return valid_snapshots[-1][1]
1526def determine_object_class_and_shape(object_value: str, relevant_snapshot: Graph) -> tuple[Optional[str], Optional[str]]:
1527 """
1528 Determine the class and shape for an object value from a graph snapshot.
1530 Args:
1531 object_value: The object value (URI or literal)
1532 relevant_snapshot: Graph snapshot to query for object information
1534 Returns:
1535 Tuple of (object_class, object_shape_uri) or (None, None) if not determinable
1536 """
1537 if not validators.url(str(object_value)) or not relevant_snapshot:
1538 return None, None
1540 object_triples = list(get_triples_from_graph(relevant_snapshot, (URIRef(object_value), None, None)))
1541 if not object_triples:
1542 return None, None
1544 object_shape_uri = determine_shape_for_entity_triples(object_triples)
1545 object_classes = [
1546 str(o)
1547 for _, _, o in get_triples_from_graph(
1548 relevant_snapshot, (URIRef(object_value), RDF.type, None)
1549 )
1550 ]
1551 object_class = get_highest_priority_class(object_classes) if object_classes else None
1553 return object_class, object_shape_uri
1556def generate_modification_text(
1557 modifications,
1558 highest_priority_class,
1559 entity_shape,
1560 history,
1561 entity_uri,
1562 current_snapshot,
1563 current_snapshot_timestamp,
1564 custom_filter: Filter,
1565) -> str:
1566 """
1567 Generate HTML text describing modifications to an entity, using display rules for property ordering.
1569 Args:
1570 modifications (dict): Dictionary of modifications from parse_sparql_update
1571 highest_priority_class (str): The highest priority class for the subject entity
1572 entity_shape (str): The shape for the subject entity
1573 history (dict): Historical snapshots dictionary
1574 entity_uri (str): URI of the entity being modified
1575 current_snapshot (Graph): Current entity snapshot
1576 current_snapshot_timestamp (str): Timestamp of current snapshot
1577 custom_filter (Filter): Filter instance for formatting
1579 Returns:
1580 str: HTML text describing the modifications
1581 """
1582 modification_text = "<p><strong>" + gettext("Modifications") + "</strong></p>"
1584 ordered_properties = get_property_order_from_rules(highest_priority_class, entity_shape)
1586 for mod_type, triples in modifications.items():
1587 modification_text += "<ul class='list-group mb-3'><p>"
1588 if mod_type == gettext("Additions"):
1589 modification_text += '<i class="bi bi-plus-circle-fill text-success"></i>'
1590 elif mod_type == gettext("Deletions"):
1591 modification_text += '<i class="bi bi-dash-circle-fill text-danger"></i>'
1592 modification_text += " <em>" + gettext(mod_type) + "</em></p>"
1594 object_shapes_cache = {}
1595 object_classes_cache = {}
1597 relevant_snapshot = None
1598 if (
1599 mod_type == gettext("Deletions")
1600 and history
1601 and entity_uri
1602 and current_snapshot_timestamp
1603 ):
1604 sorted_timestamps = sorted(history[entity_uri].keys())
1605 current_index = sorted_timestamps.index(current_snapshot_timestamp)
1606 if current_index > 0:
1607 relevant_snapshot = history[entity_uri][
1608 sorted_timestamps[current_index - 1]
1609 ]
1610 else:
1611 relevant_snapshot = current_snapshot
1613 if relevant_snapshot:
1614 for triple in triples:
1615 object_value = triple[2]
1616 object_class, object_shape = determine_object_class_and_shape(object_value, relevant_snapshot)
1617 object_classes_cache[str(object_value)] = object_class
1618 object_shapes_cache[str(object_value)] = object_shape
1620 predicate_shape_groups = {}
1621 predicate_ordering_cache = {}
1622 entity_position_cache = {}
1624 for triple in triples:
1625 predicate = str(triple[1])
1626 object_value = str(triple[2])
1627 object_shape_uri = object_shapes_cache.get(object_value)
1629 if predicate not in predicate_ordering_cache:
1630 predicate_ordering_cache[predicate] = get_predicate_ordering_info(predicate, highest_priority_class, entity_shape)
1632 order_property = predicate_ordering_cache[predicate]
1633 if order_property and validators.url(object_value) and relevant_snapshot:
1634 position_key = (object_value, predicate)
1635 if position_key not in entity_position_cache:
1636 entity_position_cache[position_key] = get_entity_position_in_sequence(
1637 object_value, entity_uri, predicate, order_property, relevant_snapshot
1638 )
1640 group_key = (predicate, object_shape_uri)
1641 if group_key not in predicate_shape_groups:
1642 predicate_shape_groups[group_key] = []
1643 predicate_shape_groups[group_key].append(triple)
1645 processed_predicates = set()
1647 def get_cached_position(triple, predicate_uri):
1648 object_value = str(triple[2])
1649 position_key = (object_value, predicate_uri)
1650 if position_key in entity_position_cache:
1651 return entity_position_cache[position_key]
1652 return float('inf')
1654 for predicate in ordered_properties:
1655 shape_order = get_shape_order_from_display_rules(highest_priority_class, entity_shape, predicate)
1656 predicate_groups = []
1657 for group_key, group_triples in predicate_shape_groups.items():
1658 predicate_uri, object_shape_uri = group_key
1659 if predicate_uri == predicate:
1660 if object_shape_uri and object_shape_uri in shape_order:
1661 shape_priority = shape_order.index(object_shape_uri)
1662 else:
1663 # Objects without shapes or shapes not in display rules go at the end
1664 shape_priority = len(shape_order)
1666 predicate_groups.append((shape_priority, group_key, group_triples))
1668 predicate_groups.sort(key=lambda x: x[0])
1669 for _, group_key, group_triples in predicate_groups:
1670 processed_predicates.add(group_key)
1672 predicate_uri, _ = group_key
1673 order_property = predicate_ordering_cache.get(predicate_uri)
1675 if order_property and relevant_snapshot:
1676 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri))
1678 for triple in group_triples:
1679 modification_text += format_triple_modification(
1680 triple,
1681 highest_priority_class,
1682 entity_shape,
1683 object_shapes_cache,
1684 object_classes_cache,
1685 relevant_snapshot,
1686 custom_filter,
1687 subject_uri=entity_uri,
1688 predicate_ordering_cache=predicate_ordering_cache,
1689 entity_position_cache=entity_position_cache,
1690 )
1692 # Then handle any remaining predicate+shape groups not in the ordered list
1693 for group_key, group_triples in predicate_shape_groups.items():
1694 if group_key not in processed_predicates:
1695 # Sort remaining triples by their cached positions too
1696 predicate_uri, _ = group_key
1697 order_property = predicate_ordering_cache.get(predicate_uri)
1699 if order_property and relevant_snapshot:
1700 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri))
1702 for triple in group_triples:
1703 modification_text += format_triple_modification(
1704 triple,
1705 highest_priority_class,
1706 entity_shape,
1707 object_shapes_cache,
1708 object_classes_cache,
1709 relevant_snapshot,
1710 custom_filter,
1711 subject_uri=entity_uri,
1712 predicate_ordering_cache=predicate_ordering_cache,
1713 entity_position_cache=entity_position_cache,
1714 )
1716 modification_text += "</ul>"
1718 return modification_text
1721def format_triple_modification(
1722 triple: Tuple[URIRef, URIRef, URIRef|Literal],
1723 highest_priority_class: str,
1724 entity_shape: str,
1725 object_shapes_cache: dict,
1726 object_classes_cache: dict,
1727 relevant_snapshot: Optional[Graph],
1728 custom_filter: Filter,
1729 subject_uri: str = None,
1730 predicate_ordering_cache: Optional[dict] = None,
1731 entity_position_cache: Optional[dict] = None,
1732) -> str:
1733 """
1734 Format a single triple modification as HTML.
1736 Args:
1737 triple: The RDF triple being modified
1738 highest_priority_class: The highest priority class for the subject entity
1739 entity_shape: The shape for the subject entity
1740 object_shapes_cache: Pre-computed cache of object shapes
1741 object_classes_cache: Pre-computed cache of object classes
1742 relevant_snapshot: Graph snapshot for context
1743 custom_filter (Filter): Filter instance for formatting
1744 subject_uri: URI of the subject entity (for ordering queries)
1746 Returns:
1747 str: HTML text describing the modification
1748 """
1749 predicate = triple[1]
1750 object_value = triple[2]
1752 object_shape_uri = object_shapes_cache.get(str(object_value))
1754 predicate_label = custom_filter.human_readable_predicate(
1755 predicate, (highest_priority_class, entity_shape), object_shape_uri=object_shape_uri
1756 )
1758 object_class = object_classes_cache.get(str(object_value)) # Get from classes cache
1759 object_label = get_object_label(
1760 object_value,
1761 predicate,
1762 object_shape_uri,
1763 object_class,
1764 relevant_snapshot,
1765 custom_filter,
1766 subject_entity_key=(highest_priority_class, entity_shape),
1767 )
1769 order_info = ""
1770 if subject_uri and validators.url(str(object_value)):
1771 if predicate_ordering_cache and entity_position_cache:
1772 order_property = predicate_ordering_cache.get(str(predicate))
1773 if order_property:
1774 position_key = (str(object_value), str(predicate))
1775 position = entity_position_cache.get(position_key)
1776 if position is not None:
1777 order_info = f' <span class="order-position-badge">#{position}</span>'
1779 return f"""
1780 <li class='d-flex align-items-center'>
1781 <span class='flex-grow-1 d-flex flex-column justify-content-center ms-3 mb-2 w-100'>
1782 <strong>{predicate_label}{order_info}</strong>
1783 <span class="object-value word-wrap">{object_label}</span>
1784 </span>
1785 </li>"""
1788def get_object_label(
1789 object_value: str,
1790 predicate: str,
1791 object_shape_uri: Optional[str],
1792 object_class: Optional[str],
1793 snapshot: Optional[Graph],
1794 custom_filter: Filter,
1795 subject_entity_key: Optional[tuple] = None,
1796) -> str:
1797 """
1798 Get appropriate display label for an object value.
1800 Args:
1801 object_value: The value to get a label for
1802 predicate: The predicate URI
1803 object_shape_uri: Pre-computed shape URI for the object
1804 object_class: Pre-computed class for the object
1805 snapshot: Graph snapshot for context (essential for deleted triples)
1806 custom_filter (Filter): Custom filter instance for formatting
1807 subject_entity_key: Tuple of (class, shape) for the subject entity
1809 Returns:
1810 str: A human-readable label for the object value
1811 """
1812 predicate = str(predicate)
1814 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
1815 return custom_filter.human_readable_class(subject_entity_key)
1817 if validators.url(object_value):
1818 if object_shape_uri or object_class:
1819 return custom_filter.human_readable_entity(
1820 object_value, (object_class, object_shape_uri), snapshot
1821 )
1822 else:
1823 return str(object_value)
1825 return str(object_value)
1828def process_modification_data(data: dict) -> Tuple[str, List[dict]]:
1829 """
1830 Process modification data to extract subjects and predicates.
1832 Args:
1833 data: Dictionary containing modification data
1835 Returns:
1836 Tuple containing subject URI and list of modification details
1837 """
1838 subject_uri = data.get("subject")
1839 if not subject_uri:
1840 raise ValueError("No subject URI provided in modification data")
1842 modifications = data.get("modifications", [])
1843 if not modifications:
1844 raise ValueError("No modifications provided in data")
1846 return subject_uri, modifications
1849def validate_modification(
1850 modification: dict, subject_uri: str
1851) -> Tuple[bool, str]:
1852 """
1853 Validate a single modification operation.
1855 Args:
1856 modification: Dictionary containing modification details
1857 subject_uri: URI of the subject being modified
1859 Returns:
1860 Tuple of (is_valid, error_message)
1861 """
1862 form_fields = get_form_fields()
1863 operation = modification.get("operation")
1864 if not operation:
1865 return False, "No operation specified in modification"
1867 predicate = modification.get("predicate")
1868 if not predicate:
1869 return False, "No predicate specified in modification"
1871 if operation not in ["add", "remove", "update"]:
1872 return False, f"Invalid operation: {operation}"
1874 if form_fields:
1875 entity_type = modification.get("entity_type")
1876 entity_shape = modification.get("entity_shape")
1878 # If entity_type is not provided in modification, get it from the database
1879 if not entity_type:
1880 entity_types = get_entity_types(subject_uri)
1881 if entity_types:
1882 entity_type = get_highest_priority_class(entity_types)
1884 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
1886 if matching_key:
1887 predicate_fields = form_fields[matching_key].get(predicate, [])
1889 for field in predicate_fields:
1890 if operation == "remove" and field.get("minCount", 0) > 0:
1891 return False, f"Cannot remove required predicate: {predicate}"
1893 if operation == "add":
1894 current_count = get_predicate_count(subject_uri, predicate)
1895 max_count = field.get("maxCount")
1897 if max_count and current_count >= max_count:
1898 return (
1899 False,
1900 f"Maximum count exceeded for predicate: {predicate}",
1901 )
1903 return True, ""
1906def get_predicate_count(subject_uri: str, predicate: str) -> int:
1907 """
1908 Get the current count of values for a predicate.
1910 Args:
1911 subject_uri: URI of the entity
1912 predicate: Predicate URI to count
1914 Returns:
1915 Number of values for the predicate
1916 """
1917 sparql = get_sparql()
1919 query = f"""
1920 SELECT (COUNT(?o) as ?count) WHERE {{
1921 <{subject_uri}> <{predicate}> ?o .
1922 }}
1923 """
1925 sparql.setQuery(query)
1926 sparql.setReturnFormat(JSON)
1927 results = sparql.query().convert()
1929 return int(results["results"]["bindings"][0]["count"]["value"])
1932def apply_modifications(
1933 editor: Editor,
1934 modifications: List[dict],
1935 subject_uri: str,
1936 graph_uri: Optional[str] = None,
1937):
1938 """
1939 Apply a list of modifications to an entity.
1941 Args:
1942 editor: Editor instance to use for modifications
1943 modifications: List of modification operations
1944 subject_uri: URI of the entity being modified
1945 graph_uri: Optional graph URI for quad store
1946 """
1947 for mod in modifications:
1948 operation = mod["operation"]
1949 predicate = mod["predicate"]
1951 if operation == "remove":
1952 editor.delete(URIRef(subject_uri), URIRef(predicate), graph_uri=graph_uri)
1954 elif operation == "add":
1955 value = mod["value"]
1956 datatype = mod.get("datatype", XSD.string)
1958 if validators.url(value):
1959 object_value = URIRef(value)
1960 else:
1961 object_value = Literal(value, datatype=URIRef(datatype))
1963 editor.create(
1964 URIRef(subject_uri), URIRef(predicate), object_value, graph_uri
1965 )
1967 elif operation == "update":
1968 old_value = mod["oldValue"]
1969 new_value = mod["newValue"]
1970 datatype = mod.get("datatype", XSD.string)
1972 if validators.url(old_value):
1973 old_object = URIRef(old_value)
1974 else:
1975 old_object = Literal(old_value, datatype=URIRef(datatype))
1977 if validators.url(new_value):
1978 new_object = URIRef(new_value)
1979 else:
1980 new_object = Literal(new_value, datatype=URIRef(datatype))
1982 editor.update(
1983 URIRef(subject_uri),
1984 URIRef(predicate),
1985 old_object,
1986 new_object,
1987 graph_uri,
1988 )