Coverage for heritrace/routes/entity.py: 89%
776 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-10-13 17:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-10-13 17:12 +0000
1import json
2import re
3from datetime import datetime
4from typing import List, Optional, Tuple
6import validators
7from flask import (Blueprint, abort, current_app, flash, jsonify, redirect,
8 render_template, request, url_for)
9from flask_babel import gettext
10from flask_login import current_user, login_required
11from rdflib import RDF, XSD, ConjunctiveGraph, Graph, Literal, URIRef
12from SPARQLWrapper import JSON
13from time_agnostic_library.agnostic_entity import AgnosticEntity
15from heritrace.apis.orcid import get_responsible_agent_uri
16from heritrace.editor import Editor
17from heritrace.extensions import (get_change_tracking_config,
18 get_custom_filter, get_dataset_endpoint,
19 get_dataset_is_quadstore, get_display_rules,
20 get_form_fields, get_provenance_endpoint,
21 get_provenance_sparql, get_shacl_graph,
22 get_sparql)
23from heritrace.forms import *
24from heritrace.utils.converters import convert_to_datetime
25from heritrace.utils.datatypes import DATATYPE_MAPPING, get_datatype_options
26from heritrace.utils.display_rules_utils import (
27 get_class_priority, get_grouped_triples, get_highest_priority_class,
28 get_predicate_ordering_info, get_property_order_from_rules,
29 get_shape_order_from_display_rules, is_entity_type_visible)
30from heritrace.utils.filters import Filter
31from heritrace.utils.primary_source_utils import (
32 get_default_primary_source, save_user_default_primary_source)
33from heritrace.utils.shacl_utils import (determine_shape_for_entity_triples,
34 find_matching_form_field,
35 get_entity_position_in_sequence)
36from heritrace.utils.shacl_validation import get_valid_predicates
37from heritrace.utils.sparql_utils import (
38 determine_shape_for_classes, fetch_current_state_with_related_entities,
39 fetch_data_graph_for_subject, get_entity_types, import_referenced_entities,
40 parse_sparql_update)
41from heritrace.utils.uri_utils import generate_unique_uri
42from heritrace.utils.virtual_properties import \
43 get_virtual_properties_for_entity, \
44 transform_entity_creation_with_virtual_properties, \
45 remove_virtual_properties_from_creation_data
47def _prepare_entity_creation_data(structured_data):
48 """
49 Prepare entity creation data by removing virtual properties and extracting core fields.
51 Returns:
52 Tuple of (cleaned_structured_data, entity_type, properties, entity_uri)
53 """
54 cleaned_structured_data = remove_virtual_properties_from_creation_data(structured_data)
55 entity_type = cleaned_structured_data.get("entity_type")
56 properties = cleaned_structured_data.get("properties", {})
57 entity_uri = generate_unique_uri(entity_type)
59 return cleaned_structured_data, entity_type, properties, entity_uri
62def _setup_editor_for_creation(editor, cleaned_structured_data):
63 """
64 Setup editor for entity creation with referenced entities and preprocessing.
65 """
66 import_referenced_entities(editor, cleaned_structured_data)
67 editor.preexisting_finished()
70def _process_virtual_properties_after_creation(editor, structured_data, entity_uri, default_graph_uri):
71 """
72 Process virtual properties after main entity creation.
73 """
74 virtual_entities = transform_entity_creation_with_virtual_properties(structured_data, str(entity_uri))
76 if virtual_entities:
77 for virtual_entity in virtual_entities:
78 virtual_entity_uri = generate_unique_uri(virtual_entity.get("entity_type"))
79 create_nested_entity(editor, virtual_entity_uri, virtual_entity, default_graph_uri)
81 # Save the virtual entities
82 editor.save()
85entity_bp = Blueprint("entity", __name__)
88def get_deleted_entity_context_info(is_deleted: bool, sorted_timestamps: List[str],
89 history: dict, subject: str) -> Tuple[Optional[Graph], Optional[str], Optional[str]]:
90 """
91 Extract context information for deleted entities with multiple timestamps.
93 When an entity is deleted but has multiple timestamps in its history,
94 this function retrieves the context snapshot from the second-to-last timestamp
95 and determines the entity's highest priority class and shape.
97 Args:
98 is_deleted: Whether the entity is deleted
99 sorted_timestamps: List of timestamps in chronological order
100 history: Dictionary mapping subject -> timestamp -> Graph
101 subject: The entity URI as string
103 Returns:
104 Tuple of (context_snapshot, highest_priority_class, entity_shape)
105 Returns (None, None, None) if conditions are not met
106 """
107 if is_deleted and len(sorted_timestamps) > 1:
108 context_snapshot = history[subject][sorted_timestamps[-2]]
110 subject_classes = [
111 o
112 for _, _, o in context_snapshot.triples(
113 (URIRef(subject), RDF.type, None)
114 )
115 ]
117 highest_priority_class = get_highest_priority_class(subject_classes)
118 entity_shape = determine_shape_for_entity_triples(
119 list(context_snapshot.triples((URIRef(subject), None, None)))
120 )
122 return context_snapshot, highest_priority_class, entity_shape
123 else:
124 return None, None, None
127@entity_bp.route("/about/<path:subject>")
128@login_required
129def about(subject):
130 """
131 Display detailed information about an entity.
133 Args:
134 subject: URI of the entity to display
135 """
136 change_tracking_config = get_change_tracking_config()
138 default_primary_source = get_default_primary_source(current_user.orcid)
140 agnostic_entity = AgnosticEntity(
141 res=subject, config=change_tracking_config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False
142 )
143 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
145 is_deleted = False
146 context_snapshot = None
147 subject_classes = []
148 highest_priority_class = None
149 entity_shape = None
151 if history.get(subject):
152 sorted_timestamps = sorted(history[subject].keys())
153 latest_metadata = next(
154 (
155 meta
156 for _, meta in provenance[subject].items()
157 if meta["generatedAtTime"] == sorted_timestamps[-1]
158 ),
159 None,
160 )
162 is_deleted = (
163 latest_metadata
164 and "invalidatedAtTime" in latest_metadata
165 and latest_metadata["invalidatedAtTime"]
166 )
168 context_snapshot, highest_priority_class, entity_shape = get_deleted_entity_context_info(
169 is_deleted, sorted_timestamps, history, subject
170 )
172 grouped_triples = {}
173 can_be_added = []
174 can_be_deleted = []
175 datatypes = {}
176 mandatory_values = {}
177 optional_values = {}
178 valid_predicates = []
179 data_graph = None
181 if not is_deleted:
182 data_graph = fetch_data_graph_for_subject(subject)
184 # Check if entity exists - if no history and no data_graph, entity doesn't exist
185 if not history.get(subject) and (not data_graph or len(data_graph) == 0):
186 abort(404)
188 if data_graph:
189 triples = list(data_graph.triples((None, None, None)))
190 subject_classes = [o for s, p, o in data_graph.triples((URIRef(subject), RDF.type, None))]
192 highest_priority_class = get_highest_priority_class(subject_classes)
193 entity_shape = determine_shape_for_entity_triples(
194 list(data_graph.triples((URIRef(subject), None, None)))
195 )
197 (
198 can_be_added,
199 can_be_deleted,
200 datatypes,
201 mandatory_values,
202 optional_values,
203 valid_predicates,
204 ) = get_valid_predicates(triples, highest_priority_class=highest_priority_class)
206 grouped_triples, relevant_properties = get_grouped_triples(
207 subject, triples, valid_predicates, highest_priority_class=highest_priority_class, highest_priority_shape=entity_shape
208 )
210 virtual_properties = get_virtual_properties_for_entity(highest_priority_class, entity_shape)
212 can_be_added = [uri for uri in can_be_added if uri in relevant_properties] + [vp[0] for vp in virtual_properties]
213 can_be_deleted = [
214 uri for uri in can_be_deleted if uri in relevant_properties
215 ] + [vp[0] for vp in virtual_properties]
217 update_form = UpdateTripleForm()
219 form_fields = get_form_fields()
221 datatype_options = get_datatype_options()
223 predicate_details_map = {}
224 for entity_type_key, predicates in form_fields.items():
225 for predicate_uri, details_list in predicates.items():
226 for details in details_list:
227 shape = details.get("nodeShape")
228 key = (predicate_uri, entity_type_key, shape)
229 predicate_details_map[key] = details
231 return render_template(
232 "entity/about.jinja",
233 subject=subject,
234 history=history,
235 can_be_added=can_be_added,
236 can_be_deleted=can_be_deleted,
237 datatypes=datatypes,
238 update_form=update_form,
239 mandatory_values=mandatory_values,
240 optional_values=optional_values,
241 shacl=bool(len(get_shacl_graph())),
242 grouped_triples=grouped_triples,
243 display_rules=get_display_rules(),
244 form_fields=form_fields,
245 entity_type=highest_priority_class,
246 entity_shape=entity_shape,
247 predicate_details_map=predicate_details_map,
248 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"],
249 dataset_db_text_index_enabled=current_app.config[
250 "DATASET_DB_TEXT_INDEX_ENABLED"
251 ],
252 is_deleted=is_deleted,
253 context=context_snapshot,
254 default_primary_source=default_primary_source,
255 datatype_options=datatype_options,
256 )
259@entity_bp.route("/create-entity", methods=["GET", "POST"])
260@login_required
261def create_entity():
262 """
263 Create a new entity in the dataset.
264 """
265 form_fields = get_form_fields()
267 default_primary_source = get_default_primary_source(current_user.orcid)
269 entity_class_shape_pairs = sorted(
270 [
271 entity_key
272 for entity_key in form_fields.keys()
273 if is_entity_type_visible(entity_key)
274 ],
275 key=lambda et: get_class_priority(et),
276 reverse=True,
277 )
279 datatype_options = get_datatype_options()
281 if request.method == "POST":
282 structured_data = json.loads(request.form.get("structured_data", "{}"))
283 primary_source = request.form.get("primary_source") or None
284 save_default_source = request.form.get("save_default_source") == 'true'
286 if primary_source and not validators.url(primary_source):
287 return jsonify({"status": "error", "errors": [gettext("Invalid primary source URL provided")]}), 400
289 if save_default_source and primary_source and validators.url(primary_source):
290 save_user_default_primary_source(current_user.orcid, primary_source)
292 editor = Editor(
293 get_dataset_endpoint(),
294 get_provenance_endpoint(),
295 current_app.config["COUNTER_HANDLER"],
296 URIRef(get_responsible_agent_uri(current_user.orcid)),
297 primary_source,
298 current_app.config["DATASET_GENERATION_TIME"],
299 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"],
300 )
302 if not structured_data.get("entity_type"):
303 return jsonify({"status": "error", "errors": [gettext("Entity type is required")]}), 400
305 # Prepare common data for entity creation
306 cleaned_structured_data, entity_type, properties, entity_uri = _prepare_entity_creation_data(structured_data)
308 default_graph_uri = (
309 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None
310 )
312 if form_fields:
313 validation_errors = validate_entity_data(cleaned_structured_data)
314 if validation_errors:
315 return jsonify({"status": "error", "errors": validation_errors}), 400
317 _setup_editor_for_creation(editor, cleaned_structured_data)
319 for predicate, values in properties.items():
320 if not isinstance(values, list):
321 values = [values]
323 entity_shape = cleaned_structured_data.get("entity_shape")
324 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
326 field_definitions = form_fields.get(matching_key, {}).get(predicate, []) if matching_key else []
328 # Get the shape from the property value if available
329 property_shape = None
330 if values and isinstance(values[0], dict):
331 property_shape = values[0].get("shape")
333 # Filter field definitions to find the matching one based on shape
334 matching_field_def = None
335 for field_def in field_definitions:
336 if property_shape:
337 # If property has a shape, match it with the field definition's subjectShape
338 if field_def.get("subjectShape") == property_shape:
339 matching_field_def = field_def
340 break
341 else:
342 # If no shape specified, use the first field definition without a shape requirement
343 if not field_def.get("subjectShape"):
344 matching_field_def = field_def
345 break
347 # If no matching field definition found, use the first one (default behavior)
348 if not matching_field_def and field_definitions:
349 matching_field_def = field_definitions[0]
351 ordered_by = (
352 matching_field_def.get("orderedBy") if matching_field_def else None
353 )
355 if ordered_by:
356 process_ordered_properties(
357 editor, entity_uri, predicate, values, default_graph_uri, ordered_by
358 )
359 else:
360 # Handle unordered properties
361 process_unordered_properties(
362 editor, entity_uri, predicate, values, default_graph_uri, matching_field_def
363 )
364 else:
365 editor.import_entity(entity_uri)
366 _setup_editor_for_creation(editor, cleaned_structured_data)
368 editor.create(
369 entity_uri,
370 RDF.type,
371 URIRef(entity_type),
372 default_graph_uri,
373 )
375 for predicate, values in properties.items():
376 for value_dict in values:
377 if value_dict["type"] == "uri":
378 editor.create(
379 entity_uri,
380 URIRef(predicate),
381 URIRef(value_dict["value"]),
382 default_graph_uri,
383 )
384 elif value_dict["type"] == "literal":
385 datatype = (
386 URIRef(value_dict["datatype"])
387 if "datatype" in value_dict
388 else XSD.string
389 )
390 editor.create(
391 entity_uri,
392 URIRef(predicate),
393 Literal(value_dict["value"], datatype=datatype),
394 default_graph_uri,
395 )
397 try:
398 # Save the main entity first
399 editor.save()
401 # Process virtual properties after creation
402 _process_virtual_properties_after_creation(editor, structured_data, entity_uri, default_graph_uri)
404 response = jsonify(
405 {
406 "status": "success",
407 "redirect_url": url_for("entity.about", subject=str(entity_uri)),
408 }
409 )
410 flash(gettext("Entity created successfully"), "success")
411 return response, 200
412 except Exception as e:
413 error_message = gettext(
414 "An error occurred while creating the entity: %(error)s", error=str(e)
415 )
416 return jsonify({"status": "error", "errors": [error_message]}), 500
418 return render_template(
419 "create_entity.jinja",
420 datatype_options=datatype_options,
421 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"],
422 dataset_db_text_index_enabled=current_app.config[
423 "DATASET_DB_TEXT_INDEX_ENABLED"
424 ],
425 default_primary_source=default_primary_source,
426 shacl=bool(form_fields),
427 entity_class_shape_pairs=entity_class_shape_pairs
428 )
431def create_nested_entity(
432 editor: Editor, entity_uri, entity_data, graph_uri=None
433):
434 form_fields = get_form_fields()
436 editor.create(
437 entity_uri,
438 URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
439 URIRef(entity_data["entity_type"]),
440 graph_uri,
441 )
443 entity_type = entity_data.get("entity_type")
444 entity_shape = entity_data.get("entity_shape")
445 properties = entity_data.get("properties", {})
447 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
449 if not matching_key:
450 return
452 # Add other properties
453 for predicate, values in properties.items():
454 if not isinstance(values, list):
455 values = [values]
456 field_definitions = form_fields[matching_key].get(predicate, [])
458 for value in values:
459 if isinstance(value, dict) and "entity_type" in value:
460 if "intermediateRelation" in value:
461 intermediate_uri = generate_unique_uri(
462 value["intermediateRelation"]["class"]
463 )
464 target_uri = generate_unique_uri(value["entity_type"])
465 editor.create(
466 entity_uri, URIRef(predicate), intermediate_uri, graph_uri
467 )
468 editor.create(
469 intermediate_uri,
470 URIRef(value["intermediateRelation"]["property"]),
471 target_uri,
472 graph_uri,
473 )
474 create_nested_entity(
475 editor, target_uri, value, graph_uri
476 )
477 else:
478 # Handle nested entities
479 nested_uri = generate_unique_uri(value["entity_type"])
480 editor.create(entity_uri, URIRef(predicate), nested_uri, graph_uri)
481 create_nested_entity(
482 editor, nested_uri, value, graph_uri
483 )
484 elif isinstance(value, dict) and value.get("is_existing_entity", False):
485 existing_entity_uri = value.get("entity_uri")
486 if existing_entity_uri:
487 editor.create(entity_uri, URIRef(predicate), URIRef(existing_entity_uri), graph_uri)
488 else:
489 # Handle simple properties - check if it's a URI or literal
490 if validators.url(str(value)):
491 object_value = URIRef(value)
492 else:
493 datatype = XSD.string # Default to string if not specified
494 datatype_uris = []
495 if field_definitions:
496 datatype_uris = field_definitions[0].get("datatypes", [])
497 datatype = determine_datatype(value, datatype_uris)
498 object_value = Literal(value, datatype=datatype)
499 editor.create(entity_uri, URIRef(predicate), object_value, graph_uri)
502def process_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri, matching_field_def):
503 """
504 Process a single entity value, handling nested entities, existing entity references, and simple literals.
506 Args:
507 editor: Editor instance for RDF operations
508 entity_uri: URI of the parent entity
509 predicate: Predicate URI
510 value: Value to process (dict or primitive)
511 default_graph_uri: Default graph URI for quad stores
512 matching_field_def: Field definition for datatype validation
514 Returns:
515 URIRef: The URI of the created/referenced entity
516 """
517 if isinstance(value, dict) and "entity_type" in value:
518 nested_uri = generate_unique_uri(value["entity_type"])
519 editor.create(
520 entity_uri,
521 URIRef(predicate),
522 nested_uri,
523 default_graph_uri,
524 )
525 create_nested_entity(
526 editor,
527 nested_uri,
528 value,
529 default_graph_uri
530 )
531 return nested_uri
532 elif isinstance(value, dict) and value.get("is_existing_entity", False):
533 entity_ref_uri = value.get("entity_uri")
534 if entity_ref_uri:
535 object_value = URIRef(entity_ref_uri)
536 editor.create(
537 entity_uri,
538 URIRef(predicate),
539 object_value,
540 default_graph_uri,
541 )
542 return object_value
543 else:
544 raise ValueError("Missing entity_uri in existing entity reference")
545 else:
546 # Handle simple properties - check if it's a URI or literal
547 if validators.url(str(value)):
548 object_value = URIRef(value)
549 else:
550 datatype_uris = []
551 if matching_field_def:
552 datatype_uris = matching_field_def.get("datatypes", [])
553 datatype = determine_datatype(value, datatype_uris)
554 object_value = Literal(value, datatype=datatype)
555 editor.create(
556 entity_uri,
557 URIRef(predicate),
558 object_value,
559 default_graph_uri,
560 )
561 return object_value
564def process_ordered_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri):
565 """
566 Process a single entity value for ordered properties.
568 Args:
569 editor: Editor instance for RDF operations
570 entity_uri: URI of the parent entity
571 predicate: Predicate URI
572 value: Value to process (dict)
573 default_graph_uri: Default graph URI for quad stores
575 Returns:
576 URIRef: The URI of the created/referenced entity
577 """
578 if isinstance(value, dict) and "entity_type" in value:
579 nested_uri = generate_unique_uri(value["entity_type"])
580 editor.create(
581 entity_uri,
582 URIRef(predicate),
583 nested_uri,
584 default_graph_uri,
585 )
586 create_nested_entity(
587 editor,
588 nested_uri,
589 value,
590 default_graph_uri
591 )
592 return nested_uri
593 elif isinstance(value, dict) and value.get("is_existing_entity", False):
594 # If it's a direct URI value (reference to existing entity)
595 nested_uri = URIRef(value)
596 editor.create(
597 entity_uri,
598 URIRef(predicate),
599 nested_uri,
600 default_graph_uri,
601 )
602 return nested_uri
603 else:
604 raise ValueError("Unexpected value type for ordered property")
607def process_ordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, ordered_by):
608 """
609 Process ordered properties by grouping values by shape and maintaining order.
611 Args:
612 editor: Editor instance for RDF operations
613 entity_uri: URI of the parent entity
614 predicate: Predicate URI
615 values: List of values to process
616 default_graph_uri: Default graph URI for quad stores
617 ordered_by: URI of the ordering property
618 """
619 values_by_shape = {}
620 for value in values:
621 shape = value.get("entity_shape")
622 if not shape:
623 shape = "default_shape"
624 if shape not in values_by_shape:
625 values_by_shape[shape] = []
626 values_by_shape[shape].append(value)
628 for shape, shape_values in values_by_shape.items():
629 previous_entity = None
630 for value in shape_values:
631 nested_uri = process_ordered_entity_value(
632 editor, entity_uri, predicate, value, default_graph_uri
633 )
635 if previous_entity:
636 editor.create(
637 previous_entity,
638 URIRef(ordered_by),
639 nested_uri,
640 default_graph_uri,
641 )
642 previous_entity = nested_uri
645def process_unordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, matching_field_def):
646 """
647 Process unordered properties.
649 Args:
650 editor: Editor instance for RDF operations
651 entity_uri: URI of the parent entity
652 predicate: Predicate URI
653 values: List of values to process
654 default_graph_uri: Default graph URI for quad stores
655 matching_field_def: Field definition for datatype validation
656 """
657 for value in values:
658 process_entity_value(
659 editor, entity_uri, predicate, value, default_graph_uri, matching_field_def
660 )
663def determine_datatype(value, datatype_uris):
664 for datatype_uri in datatype_uris:
665 validation_func = next(
666 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None
667 )
668 if validation_func and validation_func(value):
669 return URIRef(datatype_uri)
670 # If none match, default to XSD.string
671 return XSD.string
674def validate_entity_data(structured_data):
675 """
676 Validates entity data against form field definitions, considering shape matching.
678 Args:
679 structured_data (dict): Data to validate containing entity_type and properties
681 Returns:
682 list: List of validation error messages, empty if validation passes
683 """
684 custom_filter = get_custom_filter()
685 form_fields = get_form_fields()
687 errors = []
688 entity_type = structured_data.get("entity_type")
689 entity_shape = structured_data.get("entity_shape")
691 if not entity_type:
692 errors.append(gettext("Entity type is required"))
693 return errors
695 entity_key = find_matching_form_field(entity_type, entity_shape, form_fields)
697 if not entity_key:
698 errors.append(f"No form fields found for entity type: {entity_type}" +
699 (f" and shape: {entity_shape}" if entity_shape else ""))
700 return errors
702 entity_fields = form_fields[entity_key]
703 properties = structured_data.get("properties", {})
705 for prop_uri, prop_values in properties.items():
706 if URIRef(prop_uri) == RDF.type:
707 continue
709 field_definitions = entity_fields.get(prop_uri)
710 if not field_definitions:
711 errors.append(
712 gettext(
713 "Unknown property %(prop_uri)s for entity type %(entity_type)s",
714 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
715 entity_type=custom_filter.human_readable_class(entity_key),
716 )
717 )
718 continue
720 if not isinstance(prop_values, list):
721 prop_values = [prop_values]
723 property_shape = None
724 if prop_values and isinstance(prop_values[0], dict):
725 property_shape = prop_values[0].get("shape")
727 matching_field_def = None
728 for field_def in field_definitions:
729 if property_shape:
730 if field_def.get("subjectShape") == property_shape:
731 matching_field_def = field_def
732 break
733 else:
734 if not field_def.get("subjectShape"):
735 matching_field_def = field_def
736 break
738 if not matching_field_def and field_definitions:
739 matching_field_def = field_definitions[0]
741 if matching_field_def:
742 min_count = matching_field_def.get("min", 0)
743 max_count = matching_field_def.get("max", None)
744 value_count = len(prop_values)
746 if value_count < min_count:
747 value = gettext("values") if min_count > 1 else gettext("value")
748 errors.append(
749 gettext(
750 "Property %(prop_uri)s requires at least %(min_count)d %(value)s",
751 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
752 min_count=min_count,
753 value=value,
754 )
755 )
756 if max_count is not None and value_count > max_count:
757 value = gettext("values") if max_count > 1 else gettext("value")
758 errors.append(
759 gettext(
760 "Property %(prop_uri)s allows at most %(max_count)d %(value)s",
761 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
762 max_count=max_count,
763 value=value,
764 )
765 )
767 mandatory_values = matching_field_def.get("mandatory_values", [])
768 for mandatory_value in mandatory_values:
769 if mandatory_value not in prop_values:
770 errors.append(
771 gettext(
772 "Property %(prop_uri)s requires the value %(mandatory_value)s",
773 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
774 mandatory_value=mandatory_value,
775 )
776 )
778 for value in prop_values:
779 if isinstance(value, dict) and "entity_type" in value:
780 nested_errors = validate_entity_data(value)
781 errors.extend(nested_errors)
782 else:
783 datatypes = matching_field_def.get("datatypes", [])
784 if datatypes:
785 is_valid_datatype = False
786 for dtype in datatypes:
787 validation_func = next(
788 (
789 d[1]
790 for d in DATATYPE_MAPPING
791 if d[0] == URIRef(dtype)
792 ),
793 None,
794 )
795 if validation_func and validation_func(value):
796 is_valid_datatype = True
797 break
798 if not is_valid_datatype:
799 expected_types = ", ".join(
800 [
801 custom_filter.human_readable_predicate(dtype, entity_key)
802 for dtype in datatypes
803 ]
804 )
805 errors.append(
806 gettext(
807 'Value "%(value)s" for property %(prop_uri)s is not of expected type %(expected_types)s',
808 value=value,
809 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
810 expected_types=expected_types
811 )
812 )
814 optional_values = matching_field_def.get("optionalValues", [])
815 if optional_values and value not in optional_values:
816 acceptable_values = ", ".join(
817 [
818 custom_filter.human_readable_predicate(val, entity_key)
819 for val in optional_values
820 ]
821 )
822 errors.append(
823 gettext(
824 'Value "%(value)s" is not permitted for property %(prop_uri)s. Acceptable values are: %(acceptable_values)s',
825 value=value,
826 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
827 acceptable_values=acceptable_values
828 )
829 )
831 # In the RDF model, a property with zero values is equivalent to the property being absent,
832 # as a triple requires a subject, predicate, and object. Therefore, this section checks for
833 # properties defined in the schema that are completely absent from the input data but are
834 # required (min_count > 0). This complements the cardinality check above, which only
835 # validates properties that are present in the data.
836 # Check for missing required properties
837 for prop_uri, field_definitions in entity_fields.items():
838 if prop_uri not in properties:
839 for field_def in field_definitions:
840 min_count = field_def.get("min", 0)
841 if min_count > 0:
842 value = gettext("values") if min_count > 1 else gettext("value")
843 errors.append(
844 gettext(
845 "Missing required property: %(prop_uri)s requires at least %(min_count)d %(value)s",
846 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
847 min_count=min_count,
848 value=value,
849 )
850 )
851 break # Only need to report once per property
853 return errors
856@entity_bp.route("/entity-history/<path:entity_uri>")
857@login_required
858def entity_history(entity_uri):
859 """
860 Display the history of changes for an entity.
862 Args:
863 entity_uri: URI of the entity
864 """
865 custom_filter = get_custom_filter()
866 change_tracking_config = get_change_tracking_config()
868 agnostic_entity = AgnosticEntity(
869 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True
870 )
871 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
873 sorted_metadata = sorted(
874 provenance[entity_uri].items(),
875 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]),
876 )
877 sorted_timestamps = [
878 convert_to_datetime(meta["generatedAtTime"], stringify=True)
879 for _, meta in sorted_metadata
880 ]
882 # Get correct context for entity label
883 latest_metadata = sorted_metadata[-1][1] if sorted_metadata else None
884 is_latest_deletion = (
885 latest_metadata
886 and "invalidatedAtTime" in latest_metadata
887 and latest_metadata["invalidatedAtTime"]
888 )
889 if is_latest_deletion and len(sorted_timestamps) > 1:
890 context_snapshot = history[entity_uri][sorted_timestamps[-2]]
891 else:
892 context_snapshot = history[entity_uri][sorted_timestamps[-1]]
894 entity_classes = [str(triple[2]) for triple in context_snapshot.triples((URIRef(entity_uri), RDF.type, None))]
895 highest_priority_class = get_highest_priority_class(entity_classes)
897 snapshot_entity_shape = determine_shape_for_entity_triples(
898 list(context_snapshot.triples((URIRef(entity_uri), None, None)))
899 )
901 # Generate timeline events
902 events = []
903 for i, (snapshot_uri, metadata) in enumerate(sorted_metadata):
904 date = convert_to_datetime(metadata["generatedAtTime"])
905 snapshot_timestamp_str = convert_to_datetime(
906 metadata["generatedAtTime"], stringify=True
907 )
908 snapshot_graph = history[entity_uri][snapshot_timestamp_str]
910 responsible_agent = custom_filter.format_agent_reference(
911 metadata["wasAttributedTo"]
912 )
913 primary_source = custom_filter.format_source_reference(
914 metadata["hadPrimarySource"]
915 )
917 description = _format_snapshot_description(
918 metadata,
919 entity_uri,
920 highest_priority_class,
921 context_snapshot,
922 history,
923 sorted_timestamps,
924 i,
925 custom_filter,
926 )
927 modifications = metadata.get("hasUpdateQuery", "")
928 modification_text = ""
929 if modifications:
930 parsed_modifications = parse_sparql_update(modifications)
931 modification_text = generate_modification_text(
932 parsed_modifications,
933 highest_priority_class,
934 snapshot_entity_shape,
935 history=history,
936 entity_uri=entity_uri,
937 current_snapshot=snapshot_graph,
938 current_snapshot_timestamp=snapshot_timestamp_str,
939 custom_filter=custom_filter,
940 )
942 # Check if this version can be restored (not the latest version and there are multiple versions)
943 can_restore = len(sorted_metadata) > 1 and i + 1 < len(sorted_metadata)
944 restore_button = ""
945 if can_restore:
946 restore_button = f"""
947 <form action='/restore-version/{entity_uri}/{metadata["generatedAtTime"]}' method='post' class='d-inline restore-form'>
948 <button type='submit' class='btn btn-success restore-btn'>
949 <i class='bi bi-arrow-counterclockwise me-1'></i>{gettext('Restore')}
950 </button>
951 </form>
952 """
954 event = {
955 "start_date": {
956 "year": date.year,
957 "month": date.month,
958 "day": date.day,
959 "hour": date.hour,
960 "minute": date.minute,
961 "second": date.second,
962 },
963 "text": {
964 "headline": gettext("Snapshot") + " " + str(i + 1),
965 "text": f"""
966 <p><strong>{gettext('Responsible agent')}:</strong> {responsible_agent}</p>
967 <p><strong>{gettext('Primary source')}:</strong> {primary_source}</p>
968 <p><strong>{gettext('Description')}:</strong> {description}</p>
969 <div class="modifications mb-3">
970 {modification_text}
971 </div>
972 <div class="d-flex gap-2 mt-2">
973 <a href='/entity-version/{entity_uri}/{metadata["generatedAtTime"]}' class='btn btn-outline-primary view-version' target='_self'>{gettext('View version')}</a>
974 {restore_button}
975 </div>
976 """,
977 },
978 "autolink": False,
979 }
981 if i + 1 < len(sorted_metadata):
982 next_date = convert_to_datetime(
983 sorted_metadata[i + 1][1]["generatedAtTime"]
984 )
985 event["end_date"] = {
986 "year": next_date.year,
987 "month": next_date.month,
988 "day": next_date.day,
989 "hour": next_date.hour,
990 "minute": next_date.minute,
991 "second": next_date.second,
992 }
994 events.append(event)
996 entity_label = custom_filter.human_readable_entity(
997 entity_uri, (highest_priority_class, snapshot_entity_shape), context_snapshot
998 )
1000 timeline_data = {
1001 "entityUri": entity_uri,
1002 "entityLabel": entity_label,
1003 "entityClasses": list(entity_classes),
1004 "entityShape": snapshot_entity_shape,
1005 "events": events,
1006 }
1008 return render_template("entity/history.jinja", timeline_data=timeline_data)
1011def _format_snapshot_description(
1012 metadata: dict,
1013 entity_uri: str,
1014 highest_priority_class: str,
1015 context_snapshot: Graph,
1016 history: dict,
1017 sorted_timestamps: list[str],
1018 current_index: int,
1019 custom_filter: Filter,
1020) -> Tuple[str, bool]:
1021 """
1022 Formats the snapshot description and determines if it's a merge snapshot.
1024 Args:
1025 metadata: The snapshot metadata dictionary.
1026 entity_uri: The URI of the main entity.
1027 highest_priority_class: The highest priority class for the entity.
1028 context_snapshot: The graph snapshot for context.
1029 history: The history dictionary containing snapshots.
1030 sorted_timestamps: Sorted list of snapshot timestamps.
1031 current_index: The index of the current snapshot in sorted_timestamps.
1032 custom_filter: The custom filter instance for formatting.
1034 Returns:
1035 The formatted description string.
1036 """
1037 description = metadata.get("description", "")
1038 is_merge_snapshot = False
1039 was_derived_from = metadata.get('wasDerivedFrom')
1040 if isinstance(was_derived_from, list) and len(was_derived_from) > 1:
1041 is_merge_snapshot = True
1043 if is_merge_snapshot:
1044 # Regex to find URI after "merged with", potentially enclosed in single quotes or none
1045 match = re.search(r"merged with ['‘]?([^'’<>\s]+)['’]?", description)
1046 if match:
1047 potential_merged_uri = match.group(1)
1048 if validators.url(potential_merged_uri):
1049 merged_entity_uri_from_desc = potential_merged_uri
1050 merged_entity_label = None
1051 if current_index > 0:
1052 previous_snapshot_timestamp = sorted_timestamps[current_index - 1]
1053 previous_snapshot_graph = history.get(entity_uri, {}).get(previous_snapshot_timestamp)
1054 if previous_snapshot_graph:
1055 raw_merged_entity_classes = [
1056 str(o)
1057 for s, p, o in previous_snapshot_graph.triples(
1058 (URIRef(merged_entity_uri_from_desc), RDF.type, None)
1059 )
1060 ]
1061 highest_priority_merged_class = get_highest_priority_class(
1062 raw_merged_entity_classes
1063 ) if raw_merged_entity_classes else None
1065 shape = determine_shape_for_classes(raw_merged_entity_classes)
1066 merged_entity_label = custom_filter.human_readable_entity(
1067 merged_entity_uri_from_desc,
1068 (highest_priority_merged_class, shape),
1069 previous_snapshot_graph,
1070 )
1071 if (
1072 merged_entity_label
1073 and merged_entity_label != merged_entity_uri_from_desc
1074 ):
1075 description = description.replace(
1076 match.group(0), f"merged with '{merged_entity_label}'"
1077 )
1079 shape = determine_shape_for_classes([highest_priority_class])
1080 entity_label_for_desc = custom_filter.human_readable_entity(
1081 entity_uri, (highest_priority_class, shape), context_snapshot
1082 )
1083 if entity_label_for_desc and entity_label_for_desc != entity_uri:
1084 description = description.replace(f"'{entity_uri}'", f"'{entity_label_for_desc}'")
1086 return description
1089@entity_bp.route("/entity-version/<path:entity_uri>/<timestamp>")
1090@login_required
1091def entity_version(entity_uri, timestamp):
1092 """
1093 Display a specific version of an entity.
1095 Args:
1096 entity_uri: URI of the entity
1097 timestamp: Timestamp of the version to display
1098 """
1099 custom_filter = get_custom_filter()
1100 change_tracking_config = get_change_tracking_config()
1102 try:
1103 timestamp_dt = datetime.fromisoformat(timestamp)
1104 except ValueError:
1105 provenance_sparql = get_provenance_sparql()
1106 query_timestamp = f"""
1107 SELECT ?generation_time
1108 WHERE {{
1109 <{entity_uri}/prov/se/{timestamp}> <http://www.w3.org/ns/prov#generatedAtTime> ?generation_time.
1110 }}
1111 """
1112 provenance_sparql.setQuery(query_timestamp)
1113 provenance_sparql.setReturnFormat(JSON)
1114 try:
1115 generation_time = provenance_sparql.queryAndConvert()["results"][
1116 "bindings"
1117 ][0]["generation_time"]["value"]
1118 except IndexError:
1119 abort(404)
1120 timestamp = generation_time
1121 timestamp_dt = datetime.fromisoformat(generation_time)
1123 agnostic_entity = AgnosticEntity(
1124 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True
1125 )
1126 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
1127 main_entity_history = history.get(entity_uri, {})
1128 sorted_timestamps = sorted(
1129 main_entity_history.keys(), key=lambda t: convert_to_datetime(t)
1130 )
1132 if not sorted_timestamps:
1133 abort(404)
1135 closest_timestamp = min(
1136 sorted_timestamps,
1137 key=lambda t: abs(
1138 convert_to_datetime(t).astimezone() - timestamp_dt.astimezone()
1139 ),
1140 )
1142 version = main_entity_history[closest_timestamp]
1143 triples = list(version.triples((URIRef(entity_uri), None, None)))
1145 entity_metadata = provenance.get(entity_uri, {})
1146 closest_metadata = None
1147 min_time_diff = None
1149 latest_timestamp = max(sorted_timestamps)
1150 latest_metadata = None
1152 for se_uri, meta in entity_metadata.items():
1153 meta_time = convert_to_datetime(meta["generatedAtTime"])
1154 time_diff = abs((meta_time - timestamp_dt).total_seconds())
1156 if closest_metadata is None or time_diff < min_time_diff:
1157 closest_metadata = meta
1158 min_time_diff = time_diff
1160 if meta["generatedAtTime"] == latest_timestamp:
1161 latest_metadata = meta
1163 if closest_metadata is None or latest_metadata is None:
1164 abort(404)
1166 is_deletion_snapshot = (
1167 closest_timestamp == latest_timestamp
1168 and "invalidatedAtTime" in latest_metadata
1169 and latest_metadata["invalidatedAtTime"]
1170 ) or len(triples) == 0
1172 context_version = version
1173 if is_deletion_snapshot and len(sorted_timestamps) > 1:
1174 current_index = sorted_timestamps.index(closest_timestamp)
1175 if current_index > 0:
1176 context_version = main_entity_history[sorted_timestamps[current_index - 1]]
1178 if is_deletion_snapshot and len(sorted_timestamps) > 1:
1179 subject_classes = [
1180 o
1181 for _, _, o in context_version.triples((URIRef(entity_uri), RDF.type, None))
1182 ]
1183 else:
1184 subject_classes = [
1185 o for _, _, o in version.triples((URIRef(entity_uri), RDF.type, None))
1186 ]
1188 highest_priority_class = get_highest_priority_class(subject_classes)
1190 entity_shape = determine_shape_for_entity_triples(
1191 list(context_version.triples((URIRef(entity_uri), None, None)))
1192 )
1194 _, _, _, _, _, valid_predicates = get_valid_predicates(triples, highest_priority_class=highest_priority_class)
1196 grouped_triples, relevant_properties = get_grouped_triples(
1197 entity_uri,
1198 triples,
1199 valid_predicates,
1200 historical_snapshot=context_version,
1201 highest_priority_class=highest_priority_class,
1202 highest_priority_shape=entity_shape
1203 )
1205 snapshot_times = [
1206 convert_to_datetime(meta["generatedAtTime"])
1207 for meta in entity_metadata.values()
1208 ]
1209 snapshot_times = sorted(set(snapshot_times))
1210 version_number = snapshot_times.index(timestamp_dt) + 1
1212 next_snapshot_timestamp = None
1213 prev_snapshot_timestamp = None
1215 for snap_time in snapshot_times:
1216 if snap_time > timestamp_dt:
1217 next_snapshot_timestamp = snap_time.isoformat()
1218 break
1220 for snap_time in reversed(snapshot_times):
1221 if snap_time < timestamp_dt:
1222 prev_snapshot_timestamp = snap_time.isoformat()
1223 break
1225 modifications = ""
1226 if closest_metadata.get("hasUpdateQuery"):
1227 sparql_query = closest_metadata["hasUpdateQuery"]
1228 parsed_modifications = parse_sparql_update(sparql_query)
1229 modifications = generate_modification_text(
1230 parsed_modifications,
1231 highest_priority_class,
1232 entity_shape,
1233 history,
1234 entity_uri,
1235 context_version,
1236 closest_timestamp,
1237 custom_filter,
1238 )
1240 try:
1241 current_index = sorted_timestamps.index(closest_timestamp)
1242 except ValueError:
1243 current_index = -1
1245 if closest_metadata.get("description"):
1246 formatted_description = _format_snapshot_description(
1247 closest_metadata,
1248 entity_uri,
1249 highest_priority_class,
1250 context_version,
1251 history,
1252 sorted_timestamps,
1253 current_index,
1254 custom_filter,
1255 )
1256 closest_metadata["description"] = formatted_description
1258 closest_timestamp = closest_metadata["generatedAtTime"]
1260 return render_template(
1261 "entity/version.jinja",
1262 subject=entity_uri,
1263 entity_type=highest_priority_class,
1264 entity_shape=entity_shape,
1265 metadata={closest_timestamp: closest_metadata},
1266 timestamp=closest_timestamp,
1267 next_snapshot_timestamp=next_snapshot_timestamp,
1268 prev_snapshot_timestamp=prev_snapshot_timestamp,
1269 modifications=modifications,
1270 grouped_triples=grouped_triples,
1271 version_number=version_number,
1272 version=context_version,
1273 )
1276@entity_bp.route("/restore-version/<path:entity_uri>/<timestamp>", methods=["POST"])
1277@login_required
1278def restore_version(entity_uri, timestamp):
1279 """
1280 Restore an entity to a previous version.
1282 Args:
1283 entity_uri: URI of the entity to restore
1284 timestamp: Timestamp of the version to restore to
1285 """
1286 timestamp = convert_to_datetime(timestamp, stringify=True)
1287 change_tracking_config = get_change_tracking_config()
1289 # Get entity history
1290 agnostic_entity = AgnosticEntity(
1291 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True
1292 )
1293 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
1295 historical_graph = history.get(entity_uri, {}).get(timestamp)
1296 if historical_graph is None:
1297 abort(404)
1299 current_graph = fetch_current_state_with_related_entities(provenance)
1301 is_deleted = len(list(current_graph.triples((URIRef(entity_uri), None, None)))) == 0
1303 triples_or_quads_to_delete, triples_or_quads_to_add = compute_graph_differences(
1304 current_graph, historical_graph
1305 )
1307 # Get all entities that need restoration
1308 entities_to_restore = get_entities_to_restore(
1309 triples_or_quads_to_delete, triples_or_quads_to_add, entity_uri
1310 )
1312 # Prepare snapshot information for all entities
1313 entity_snapshots = prepare_entity_snapshots(
1314 entities_to_restore, provenance, timestamp
1315 )
1317 # Create editor instance
1318 editor = Editor(
1319 get_dataset_endpoint(),
1320 get_provenance_endpoint(),
1321 current_app.config["COUNTER_HANDLER"],
1322 URIRef(get_responsible_agent_uri(current_user.orcid)),
1323 None if is_deleted else entity_snapshots[entity_uri]["source"],
1324 current_app.config["DATASET_GENERATION_TIME"],
1325 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"],
1326 )
1328 # Import current state into editor
1329 if get_dataset_is_quadstore():
1330 for quad in current_graph.quads():
1331 editor.g_set.add(quad)
1332 else:
1333 for triple in current_graph:
1334 editor.g_set.add(triple)
1336 editor.preexisting_finished()
1338 # Apply deletions
1339 for item in triples_or_quads_to_delete:
1340 if len(item) == 4:
1341 editor.delete(item[0], item[1], item[2], item[3])
1342 else:
1343 editor.delete(item[0], item[1], item[2])
1345 subject = str(item[0])
1346 if subject in entity_snapshots:
1347 entity_info = entity_snapshots[subject]
1348 if entity_info["needs_restore"]:
1349 editor.g_set.mark_as_restored(URIRef(subject))
1350 editor.g_set.entity_index[URIRef(subject)]["restoration_source"] = (
1351 entity_info["source"]
1352 )
1354 # Apply additions
1355 for item in triples_or_quads_to_add:
1356 if len(item) == 4:
1357 editor.create(item[0], item[1], item[2], item[3])
1358 else:
1359 editor.create(item[0], item[1], item[2])
1361 subject = str(item[0])
1362 if subject in entity_snapshots:
1363 entity_info = entity_snapshots[subject]
1364 if entity_info["needs_restore"]:
1365 editor.g_set.mark_as_restored(URIRef(subject))
1366 editor.g_set.entity_index[URIRef(subject)]["source"] = entity_info[
1367 "source"
1368 ]
1370 # Handle main entity restoration if needed
1371 if is_deleted and entity_uri in entity_snapshots:
1372 editor.g_set.mark_as_restored(URIRef(entity_uri))
1373 source = entity_snapshots[entity_uri]["source"]
1374 editor.g_set.entity_index[URIRef(entity_uri)]["source"] = source
1376 try:
1377 editor.save()
1378 flash(gettext("Version restored successfully"), "success")
1379 except Exception as e:
1380 flash(
1381 gettext(
1382 "An error occurred while restoring the version: %(error)s", error=str(e)
1383 ),
1384 "error",
1385 )
1387 return redirect(url_for("entity.about", subject=entity_uri))
1390def compute_graph_differences(
1391 current_graph: Graph | ConjunctiveGraph, historical_graph: Graph | ConjunctiveGraph
1392):
1393 if get_dataset_is_quadstore():
1394 current_data = set(current_graph.quads())
1395 historical_data = set(historical_graph.quads())
1396 else:
1397 current_data = set(current_graph.triples((None, None, None)))
1398 historical_data = set(historical_graph.triples((None, None, None)))
1399 triples_or_quads_to_delete = current_data - historical_data
1400 triples_or_quads_to_add = historical_data - current_data
1402 return triples_or_quads_to_delete, triples_or_quads_to_add
1405def get_entities_to_restore(
1406 triples_or_quads_to_delete: set, triples_or_quads_to_add: set, main_entity_uri: str
1407) -> set:
1408 """
1409 Identify all entities that need to be restored based on the graph differences.
1411 Args:
1412 triples_or_quads_to_delete: Set of triples/quads to be deleted
1413 triples_or_quads_to_add: Set of triples/quads to be added
1414 main_entity_uri: URI of the main entity being restored
1416 Returns:
1417 Set of entity URIs that need to be restored
1418 """
1419 entities_to_restore = {main_entity_uri}
1421 for item in list(triples_or_quads_to_delete) + list(triples_or_quads_to_add):
1422 predicate = str(item[1])
1423 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
1424 continue
1426 subject = str(item[0])
1427 obj = str(item[2])
1428 for uri in [subject, obj]:
1429 if uri != main_entity_uri and validators.url(uri):
1430 entities_to_restore.add(uri)
1432 return entities_to_restore
1435def prepare_entity_snapshots(
1436 entities_to_restore: set, provenance: dict, target_time: str
1437) -> dict:
1438 """
1439 Prepare snapshot information for all entities that need to be restored.
1441 Args:
1442 entities_to_restore: Set of entity URIs to process
1443 provenance: Dictionary containing provenance data for all entities
1444 target_time: Target restoration time
1446 Returns:
1447 Dictionary mapping entity URIs to their restoration information
1448 """
1449 entity_snapshots = {}
1451 for entity_uri in entities_to_restore:
1452 if entity_uri not in provenance:
1453 continue
1455 # Find the appropriate source snapshot
1456 source_snapshot = find_appropriate_snapshot(provenance[entity_uri], target_time)
1457 if not source_snapshot:
1458 continue
1460 # Check if entity is currently deleted by examining its latest snapshot
1461 sorted_snapshots = sorted(
1462 provenance[entity_uri].items(),
1463 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]),
1464 )
1465 latest_snapshot = sorted_snapshots[-1][1]
1466 is_deleted = (
1467 latest_snapshot.get("invalidatedAtTime")
1468 and latest_snapshot["generatedAtTime"]
1469 == latest_snapshot["invalidatedAtTime"]
1470 )
1472 entity_snapshots[entity_uri] = {
1473 "source": source_snapshot,
1474 "needs_restore": is_deleted,
1475 }
1477 return entity_snapshots
1480def find_appropriate_snapshot(provenance_data: dict, target_time: str) -> Optional[str]:
1481 """
1482 Find the most appropriate snapshot to use as a source for restoration.
1484 Args:
1485 provenance_data: Dictionary of snapshots and their metadata for an entity
1486 target_time: The target restoration time as ISO format string
1488 Returns:
1489 The URI of the most appropriate snapshot, or None if no suitable snapshot is found
1490 """
1491 target_datetime = convert_to_datetime(target_time)
1493 # Convert all generation times to datetime for comparison
1494 valid_snapshots = []
1495 for snapshot_uri, metadata in provenance_data.items():
1496 generation_time = convert_to_datetime(metadata["generatedAtTime"])
1498 # Skip deletion snapshots (where generation time equals invalidation time)
1499 if (
1500 metadata.get("invalidatedAtTime")
1501 and metadata["generatedAtTime"] == metadata["invalidatedAtTime"]
1502 ):
1503 continue
1505 # Only consider snapshots up to our target time
1506 if generation_time <= target_datetime:
1507 valid_snapshots.append((generation_time, snapshot_uri))
1509 if not valid_snapshots:
1510 return None
1512 # Sort by generation time and take the most recent one
1513 valid_snapshots.sort(key=lambda x: x[0])
1514 return valid_snapshots[-1][1]
1517def determine_object_class_and_shape(object_value: str, relevant_snapshot: Graph) -> tuple[Optional[str], Optional[str]]:
1518 """
1519 Determine the class and shape for an object value from a graph snapshot.
1521 Args:
1522 object_value: The object value (URI or literal)
1523 relevant_snapshot: Graph snapshot to query for object information
1525 Returns:
1526 Tuple of (object_class, object_shape_uri) or (None, None) if not determinable
1527 """
1528 if not validators.url(str(object_value)) or not relevant_snapshot:
1529 return None, None
1531 object_triples = list(relevant_snapshot.triples((URIRef(object_value), None, None)))
1532 if not object_triples:
1533 return None, None
1535 object_shape_uri = determine_shape_for_entity_triples(object_triples)
1536 object_classes = [
1537 str(o)
1538 for _, _, o in relevant_snapshot.triples(
1539 (URIRef(object_value), RDF.type, None)
1540 )
1541 ]
1542 object_class = get_highest_priority_class(object_classes) if object_classes else None
1544 return object_class, object_shape_uri
1547def generate_modification_text(
1548 modifications,
1549 highest_priority_class,
1550 entity_shape,
1551 history,
1552 entity_uri,
1553 current_snapshot,
1554 current_snapshot_timestamp,
1555 custom_filter: Filter,
1556) -> str:
1557 """
1558 Generate HTML text describing modifications to an entity, using display rules for property ordering.
1560 Args:
1561 modifications (dict): Dictionary of modifications from parse_sparql_update
1562 highest_priority_class (str): The highest priority class for the subject entity
1563 entity_shape (str): The shape for the subject entity
1564 history (dict): Historical snapshots dictionary
1565 entity_uri (str): URI of the entity being modified
1566 current_snapshot (Graph): Current entity snapshot
1567 current_snapshot_timestamp (str): Timestamp of current snapshot
1568 custom_filter (Filter): Filter instance for formatting
1570 Returns:
1571 str: HTML text describing the modifications
1572 """
1573 modification_text = "<p><strong>" + gettext("Modifications") + "</strong></p>"
1575 ordered_properties = get_property_order_from_rules(highest_priority_class, entity_shape)
1577 for mod_type, triples in modifications.items():
1578 modification_text += "<ul class='list-group mb-3'><p>"
1579 if mod_type == gettext("Additions"):
1580 modification_text += '<i class="bi bi-plus-circle-fill text-success"></i>'
1581 elif mod_type == gettext("Deletions"):
1582 modification_text += '<i class="bi bi-dash-circle-fill text-danger"></i>'
1583 modification_text += " <em>" + gettext(mod_type) + "</em></p>"
1585 object_shapes_cache = {}
1586 object_classes_cache = {}
1588 relevant_snapshot = None
1589 if (
1590 mod_type == gettext("Deletions")
1591 and history
1592 and entity_uri
1593 and current_snapshot_timestamp
1594 ):
1595 sorted_timestamps = sorted(history[entity_uri].keys())
1596 current_index = sorted_timestamps.index(current_snapshot_timestamp)
1597 if current_index > 0:
1598 relevant_snapshot = history[entity_uri][
1599 sorted_timestamps[current_index - 1]
1600 ]
1601 else:
1602 relevant_snapshot = current_snapshot
1604 if relevant_snapshot:
1605 for triple in triples:
1606 object_value = triple[2]
1607 object_class, object_shape = determine_object_class_and_shape(object_value, relevant_snapshot)
1608 object_classes_cache[str(object_value)] = object_class
1609 object_shapes_cache[str(object_value)] = object_shape
1611 predicate_shape_groups = {}
1612 predicate_ordering_cache = {}
1613 entity_position_cache = {}
1615 for triple in triples:
1616 predicate = str(triple[1])
1617 object_value = str(triple[2])
1618 object_shape_uri = object_shapes_cache.get(object_value)
1620 if predicate not in predicate_ordering_cache:
1621 predicate_ordering_cache[predicate] = get_predicate_ordering_info(predicate, highest_priority_class, entity_shape)
1623 order_property = predicate_ordering_cache[predicate]
1624 if order_property and validators.url(object_value) and relevant_snapshot:
1625 position_key = (object_value, predicate)
1626 if position_key not in entity_position_cache:
1627 entity_position_cache[position_key] = get_entity_position_in_sequence(
1628 object_value, entity_uri, predicate, order_property, relevant_snapshot
1629 )
1631 group_key = (predicate, object_shape_uri)
1632 if group_key not in predicate_shape_groups:
1633 predicate_shape_groups[group_key] = []
1634 predicate_shape_groups[group_key].append(triple)
1636 processed_predicates = set()
1638 def get_cached_position(triple, predicate_uri):
1639 object_value = str(triple[2])
1640 position_key = (object_value, predicate_uri)
1641 return entity_position_cache.get(position_key, float('inf'))
1643 for predicate in ordered_properties:
1644 shape_order = get_shape_order_from_display_rules(highest_priority_class, entity_shape, predicate)
1645 predicate_groups = []
1646 for group_key, group_triples in predicate_shape_groups.items():
1647 predicate_uri, object_shape_uri = group_key
1648 if predicate_uri == predicate:
1649 if object_shape_uri and object_shape_uri in shape_order:
1650 shape_priority = shape_order.index(object_shape_uri)
1651 else:
1652 # Objects without shapes or shapes not in display rules go at the end
1653 shape_priority = len(shape_order)
1655 predicate_groups.append((shape_priority, group_key, group_triples))
1657 predicate_groups.sort(key=lambda x: x[0])
1658 for _, group_key, group_triples in predicate_groups:
1659 processed_predicates.add(group_key)
1661 predicate_uri, _ = group_key
1662 order_property = predicate_ordering_cache.get(predicate_uri)
1664 if order_property and relevant_snapshot:
1665 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri))
1667 for triple in group_triples:
1668 modification_text += format_triple_modification(
1669 triple,
1670 highest_priority_class,
1671 entity_shape,
1672 object_shapes_cache,
1673 object_classes_cache,
1674 relevant_snapshot,
1675 custom_filter,
1676 subject_uri=entity_uri,
1677 predicate_ordering_cache=predicate_ordering_cache,
1678 entity_position_cache=entity_position_cache,
1679 )
1681 # Then handle any remaining predicate+shape groups not in the ordered list
1682 for group_key, group_triples in predicate_shape_groups.items():
1683 if group_key not in processed_predicates:
1684 # Sort remaining triples by their cached positions too
1685 predicate_uri, _ = group_key
1686 order_property = predicate_ordering_cache.get(predicate_uri)
1688 if order_property and relevant_snapshot:
1689 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri))
1691 for triple in group_triples:
1692 modification_text += format_triple_modification(
1693 triple,
1694 highest_priority_class,
1695 entity_shape,
1696 object_shapes_cache,
1697 object_classes_cache,
1698 relevant_snapshot,
1699 custom_filter,
1700 subject_uri=entity_uri,
1701 predicate_ordering_cache=predicate_ordering_cache,
1702 entity_position_cache=entity_position_cache,
1703 )
1705 modification_text += "</ul>"
1707 return modification_text
1710def format_triple_modification(
1711 triple: Tuple[URIRef, URIRef, URIRef|Literal],
1712 highest_priority_class: str,
1713 entity_shape: str,
1714 object_shapes_cache: dict,
1715 object_classes_cache: dict,
1716 relevant_snapshot: Optional[Graph],
1717 custom_filter: Filter,
1718 subject_uri: str = None,
1719 predicate_ordering_cache: Optional[dict] = None,
1720 entity_position_cache: Optional[dict] = None,
1721) -> str:
1722 """
1723 Format a single triple modification as HTML.
1725 Args:
1726 triple: The RDF triple being modified
1727 highest_priority_class: The highest priority class for the subject entity
1728 entity_shape: The shape for the subject entity
1729 object_shapes_cache: Pre-computed cache of object shapes
1730 object_classes_cache: Pre-computed cache of object classes
1731 relevant_snapshot: Graph snapshot for context
1732 custom_filter (Filter): Filter instance for formatting
1733 subject_uri: URI of the subject entity (for ordering queries)
1735 Returns:
1736 str: HTML text describing the modification
1737 """
1738 predicate = triple[1]
1739 object_value = triple[2]
1741 object_shape_uri = object_shapes_cache.get(str(object_value))
1743 predicate_label = custom_filter.human_readable_predicate(
1744 predicate, (highest_priority_class, entity_shape), object_shape_uri=object_shape_uri
1745 )
1747 object_class = object_classes_cache.get(str(object_value)) # Get from classes cache
1748 object_label = get_object_label(
1749 object_value,
1750 predicate,
1751 object_shape_uri,
1752 object_class,
1753 relevant_snapshot,
1754 custom_filter,
1755 subject_entity_key=(highest_priority_class, entity_shape),
1756 )
1758 order_info = ""
1759 if subject_uri and validators.url(str(object_value)):
1760 if predicate_ordering_cache and entity_position_cache:
1761 order_property = predicate_ordering_cache.get(str(predicate))
1762 if order_property:
1763 position_key = (str(object_value), str(predicate))
1764 position = entity_position_cache.get(position_key)
1765 if position is not None:
1766 order_info = f' <span class="order-position-badge">#{position}</span>'
1768 return f"""
1769 <li class='d-flex align-items-center'>
1770 <span class='flex-grow-1 d-flex flex-column justify-content-center ms-3 mb-2 w-100'>
1771 <strong>{predicate_label}{order_info}</strong>
1772 <span class="object-value word-wrap">{object_label}</span>
1773 </span>
1774 </li>"""
1777def get_object_label(
1778 object_value: str,
1779 predicate: str,
1780 object_shape_uri: Optional[str],
1781 object_class: Optional[str],
1782 snapshot: Optional[Graph],
1783 custom_filter: Filter,
1784 subject_entity_key: Optional[tuple] = None,
1785) -> str:
1786 """
1787 Get appropriate display label for an object value.
1789 Args:
1790 object_value: The value to get a label for
1791 predicate: The predicate URI
1792 object_shape_uri: Pre-computed shape URI for the object
1793 object_class: Pre-computed class for the object
1794 snapshot: Graph snapshot for context (essential for deleted triples)
1795 custom_filter (Filter): Custom filter instance for formatting
1796 subject_entity_key: Tuple of (class, shape) for the subject entity
1798 Returns:
1799 str: A human-readable label for the object value
1800 """
1801 predicate = str(predicate)
1803 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
1804 return custom_filter.human_readable_class(subject_entity_key)
1806 if validators.url(object_value):
1807 if object_shape_uri or object_class:
1808 return custom_filter.human_readable_entity(
1809 object_value, (object_class, object_shape_uri), snapshot
1810 )
1811 else:
1812 return str(object_value)
1814 return str(object_value)
1817def process_modification_data(data: dict) -> Tuple[str, List[dict]]:
1818 """
1819 Process modification data to extract subjects and predicates.
1821 Args:
1822 data: Dictionary containing modification data
1824 Returns:
1825 Tuple containing subject URI and list of modification details
1826 """
1827 subject_uri = data.get("subject")
1828 if not subject_uri:
1829 raise ValueError("No subject URI provided in modification data")
1831 modifications = data.get("modifications", [])
1832 if not modifications:
1833 raise ValueError("No modifications provided in data")
1835 return subject_uri, modifications
1838def validate_modification(
1839 modification: dict, subject_uri: str
1840) -> Tuple[bool, str]:
1841 """
1842 Validate a single modification operation.
1844 Args:
1845 modification: Dictionary containing modification details
1846 subject_uri: URI of the subject being modified
1848 Returns:
1849 Tuple of (is_valid, error_message)
1850 """
1851 form_fields = get_form_fields()
1852 operation = modification.get("operation")
1853 if not operation:
1854 return False, "No operation specified in modification"
1856 predicate = modification.get("predicate")
1857 if not predicate:
1858 return False, "No predicate specified in modification"
1860 if operation not in ["add", "remove", "update"]:
1861 return False, f"Invalid operation: {operation}"
1863 if form_fields:
1864 entity_type = modification.get("entity_type")
1865 entity_shape = modification.get("entity_shape")
1867 # If entity_type is not provided in modification, get it from the database
1868 if not entity_type:
1869 entity_types = get_entity_types(subject_uri)
1870 if entity_types:
1871 entity_type = get_highest_priority_class(entity_types)
1873 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
1875 if matching_key:
1876 predicate_fields = form_fields[matching_key].get(predicate, [])
1878 for field in predicate_fields:
1879 if operation == "remove" and field.get("minCount", 0) > 0:
1880 return False, f"Cannot remove required predicate: {predicate}"
1882 if operation == "add":
1883 current_count = get_predicate_count(subject_uri, predicate)
1884 max_count = field.get("maxCount")
1886 if max_count and current_count >= max_count:
1887 return (
1888 False,
1889 f"Maximum count exceeded for predicate: {predicate}",
1890 )
1892 return True, ""
1895def get_predicate_count(subject_uri: str, predicate: str) -> int:
1896 """
1897 Get the current count of values for a predicate.
1899 Args:
1900 subject_uri: URI of the entity
1901 predicate: Predicate URI to count
1903 Returns:
1904 Number of values for the predicate
1905 """
1906 sparql = get_sparql()
1908 query = f"""
1909 SELECT (COUNT(?o) as ?count) WHERE {{
1910 <{subject_uri}> <{predicate}> ?o .
1911 }}
1912 """
1914 sparql.setQuery(query)
1915 sparql.setReturnFormat(JSON)
1916 results = sparql.query().convert()
1918 return int(results["results"]["bindings"][0]["count"]["value"])
1921def apply_modifications(
1922 editor: Editor,
1923 modifications: List[dict],
1924 subject_uri: str,
1925 graph_uri: Optional[str] = None,
1926):
1927 """
1928 Apply a list of modifications to an entity.
1930 Args:
1931 editor: Editor instance to use for modifications
1932 modifications: List of modification operations
1933 subject_uri: URI of the entity being modified
1934 graph_uri: Optional graph URI for quad store
1935 """
1936 for mod in modifications:
1937 operation = mod["operation"]
1938 predicate = mod["predicate"]
1940 if operation == "remove":
1941 editor.delete(URIRef(subject_uri), URIRef(predicate), graph_uri=graph_uri)
1943 elif operation == "add":
1944 value = mod["value"]
1945 datatype = mod.get("datatype", XSD.string)
1947 if validators.url(value):
1948 object_value = URIRef(value)
1949 else:
1950 object_value = Literal(value, datatype=URIRef(datatype))
1952 editor.create(
1953 URIRef(subject_uri), URIRef(predicate), object_value, graph_uri
1954 )
1956 elif operation == "update":
1957 old_value = mod["oldValue"]
1958 new_value = mod["newValue"]
1959 datatype = mod.get("datatype", XSD.string)
1961 if validators.url(old_value):
1962 old_object = URIRef(old_value)
1963 else:
1964 old_object = Literal(old_value, datatype=URIRef(datatype))
1966 if validators.url(new_value):
1967 new_object = URIRef(new_value)
1968 else:
1969 new_object = Literal(new_value, datatype=URIRef(datatype))
1971 editor.update(
1972 URIRef(subject_uri),
1973 URIRef(predicate),
1974 old_object,
1975 new_object,
1976 graph_uri,
1977 )