Coverage for heritrace/routes/entity.py: 90%
761 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-01 22:12 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-01 22:12 +0000
1import json
2import re
3from datetime import datetime
4from typing import List, Optional, Tuple
6import validators
7from flask import (Blueprint, abort, current_app, flash, jsonify, redirect,
8 render_template, request, url_for)
9from flask_babel import gettext
10from flask_login import current_user, login_required
11from heritrace.apis.orcid import get_responsible_agent_uri
12from heritrace.editor import Editor
13from heritrace.extensions import (get_change_tracking_config,
14 get_custom_filter, get_dataset_endpoint,
15 get_dataset_is_quadstore, get_display_rules,
16 get_form_fields, get_provenance_endpoint,
17 get_provenance_sparql, get_shacl_graph,
18 get_sparql)
19from heritrace.forms import *
20from heritrace.utils.converters import convert_to_datetime
21from heritrace.utils.datatypes import DATATYPE_MAPPING, get_datatype_options
22from heritrace.utils.display_rules_utils import (
23 get_class_priority, get_grouped_triples, get_highest_priority_class,
24 get_predicate_ordering_info, get_property_order_from_rules,
25 get_shape_order_from_display_rules, is_entity_type_visible)
26from heritrace.utils.filters import Filter
27from heritrace.utils.primary_source_utils import (
28 get_default_primary_source, save_user_default_primary_source)
29from heritrace.utils.shacl_utils import (determine_shape_for_entity_triples,
30 find_matching_form_field,
31 get_entity_position_in_sequence)
32from heritrace.utils.shacl_validation import get_valid_predicates
33from heritrace.utils.sparql_utils import (
34 determine_shape_for_classes, fetch_current_state_with_related_entities,
35 fetch_data_graph_for_subject, get_entity_types, import_referenced_entities,
36 parse_sparql_update)
37from heritrace.utils.uri_utils import generate_unique_uri
38from rdflib import RDF, XSD, ConjunctiveGraph, Graph, Literal, URIRef
39from SPARQLWrapper import JSON
40from time_agnostic_library.agnostic_entity import AgnosticEntity
42entity_bp = Blueprint("entity", __name__)
45def get_deleted_entity_context_info(is_deleted: bool, sorted_timestamps: List[str],
46 history: dict, subject: str) -> Tuple[Optional[Graph], Optional[str], Optional[str]]:
47 """
48 Extract context information for deleted entities with multiple timestamps.
50 When an entity is deleted but has multiple timestamps in its history,
51 this function retrieves the context snapshot from the second-to-last timestamp
52 and determines the entity's highest priority class and shape.
54 Args:
55 is_deleted: Whether the entity is deleted
56 sorted_timestamps: List of timestamps in chronological order
57 history: Dictionary mapping subject -> timestamp -> Graph
58 subject: The entity URI as string
60 Returns:
61 Tuple of (context_snapshot, highest_priority_class, entity_shape)
62 Returns (None, None, None) if conditions are not met
63 """
64 if is_deleted and len(sorted_timestamps) > 1:
65 context_snapshot = history[subject][sorted_timestamps[-2]]
67 subject_classes = [
68 o
69 for _, _, o in context_snapshot.triples(
70 (URIRef(subject), RDF.type, None)
71 )
72 ]
74 highest_priority_class = get_highest_priority_class(subject_classes)
75 entity_shape = determine_shape_for_entity_triples(
76 list(context_snapshot.triples((URIRef(subject), None, None)))
77 )
79 return context_snapshot, highest_priority_class, entity_shape
80 else:
81 return None, None, None
84@entity_bp.route("/about/<path:subject>")
85@login_required
86def about(subject):
87 """
88 Display detailed information about an entity.
90 Args:
91 subject: URI of the entity to display
92 """
93 change_tracking_config = get_change_tracking_config()
95 default_primary_source = get_default_primary_source(current_user.orcid)
97 agnostic_entity = AgnosticEntity(
98 res=subject, config=change_tracking_config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False
99 )
100 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
102 is_deleted = False
103 context_snapshot = None
104 subject_classes = []
105 highest_priority_class = None
106 entity_shape = None
108 if history.get(subject):
109 sorted_timestamps = sorted(history[subject].keys())
110 latest_metadata = next(
111 (
112 meta
113 for _, meta in provenance[subject].items()
114 if meta["generatedAtTime"] == sorted_timestamps[-1]
115 ),
116 None,
117 )
119 is_deleted = (
120 latest_metadata
121 and "invalidatedAtTime" in latest_metadata
122 and latest_metadata["invalidatedAtTime"]
123 )
125 context_snapshot, highest_priority_class, entity_shape = get_deleted_entity_context_info(
126 is_deleted, sorted_timestamps, history, subject
127 )
129 grouped_triples = {}
130 can_be_added = []
131 can_be_deleted = []
132 datatypes = {}
133 mandatory_values = {}
134 optional_values = {}
135 valid_predicates = []
136 data_graph = None
138 if not is_deleted:
139 data_graph = fetch_data_graph_for_subject(subject)
140 if data_graph:
141 triples = list(data_graph.triples((None, None, None)))
142 subject_classes = [o for s, p, o in data_graph.triples((URIRef(subject), RDF.type, None))]
144 highest_priority_class = get_highest_priority_class(subject_classes)
145 entity_shape = determine_shape_for_entity_triples(
146 list(data_graph.triples((URIRef(subject), None, None)))
147 )
149 (
150 can_be_added,
151 can_be_deleted,
152 datatypes,
153 mandatory_values,
154 optional_values,
155 valid_predicates,
156 ) = get_valid_predicates(triples, highest_priority_class=highest_priority_class)
158 grouped_triples, relevant_properties = get_grouped_triples(
159 subject, triples, valid_predicates, highest_priority_class=highest_priority_class, highest_priority_shape=entity_shape
160 )
162 can_be_added = [uri for uri in can_be_added if uri in relevant_properties]
163 can_be_deleted = [
164 uri for uri in can_be_deleted if uri in relevant_properties
165 ]
167 update_form = UpdateTripleForm()
169 form_fields = get_form_fields()
171 datatype_options = get_datatype_options()
173 predicate_details_map = {}
174 for entity_type_key, predicates in form_fields.items():
175 for predicate_uri, details_list in predicates.items():
176 for details in details_list:
177 shape = details.get("nodeShape")
178 key = (predicate_uri, entity_type_key, shape)
179 predicate_details_map[key] = details
181 return render_template(
182 "entity/about.jinja",
183 subject=subject,
184 history=history,
185 can_be_added=can_be_added,
186 can_be_deleted=can_be_deleted,
187 datatypes=datatypes,
188 update_form=update_form,
189 mandatory_values=mandatory_values,
190 optional_values=optional_values,
191 shacl=bool(len(get_shacl_graph())),
192 grouped_triples=grouped_triples,
193 display_rules=get_display_rules(),
194 form_fields=form_fields,
195 entity_type=highest_priority_class,
196 entity_shape=entity_shape,
197 predicate_details_map=predicate_details_map,
198 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"],
199 dataset_db_text_index_enabled=current_app.config[
200 "DATASET_DB_TEXT_INDEX_ENABLED"
201 ],
202 is_deleted=is_deleted,
203 context=context_snapshot,
204 default_primary_source=default_primary_source,
205 datatype_options=datatype_options,
206 )
209@entity_bp.route("/create-entity", methods=["GET", "POST"])
210@login_required
211def create_entity():
212 """
213 Create a new entity in the dataset.
214 """
215 form_fields = get_form_fields()
217 default_primary_source = get_default_primary_source(current_user.orcid)
219 entity_class_shape_pairs = sorted(
220 [
221 entity_key
222 for entity_key in form_fields.keys()
223 if is_entity_type_visible(entity_key)
224 ],
225 key=lambda et: get_class_priority(et),
226 reverse=True,
227 )
229 datatype_options = get_datatype_options()
231 if request.method == "POST":
232 structured_data = json.loads(request.form.get("structured_data", "{}"))
233 primary_source = request.form.get("primary_source") or None
234 save_default_source = request.form.get("save_default_source") == 'true'
236 if primary_source and not validators.url(primary_source):
237 return jsonify({"status": "error", "errors": [gettext("Invalid primary source URL provided")]}), 400
239 if save_default_source and primary_source and validators.url(primary_source):
240 save_user_default_primary_source(current_user.orcid, primary_source)
242 editor = Editor(
243 get_dataset_endpoint(),
244 get_provenance_endpoint(),
245 current_app.config["COUNTER_HANDLER"],
246 URIRef(get_responsible_agent_uri(current_user.orcid)),
247 primary_source,
248 current_app.config["DATASET_GENERATION_TIME"],
249 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"],
250 )
252 if form_fields:
253 validation_errors = validate_entity_data(structured_data)
254 if validation_errors:
255 return jsonify({"status": "error", "errors": validation_errors}), 400
257 entity_type = structured_data.get("entity_type")
258 properties = structured_data.get("properties", {})
260 entity_uri = generate_unique_uri(entity_type)
262 import_referenced_entities(editor, structured_data)
264 editor.preexisting_finished()
266 default_graph_uri = (
267 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None
268 )
270 for predicate, values in properties.items():
271 if not isinstance(values, list):
272 values = [values]
274 entity_shape = structured_data.get("entity_shape")
275 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
277 field_definitions = form_fields.get(matching_key, {}).get(predicate, []) if matching_key else []
279 # Get the shape from the property value if available
280 property_shape = None
281 if values and isinstance(values[0], dict):
282 property_shape = values[0].get("shape")
284 # Filter field definitions to find the matching one based on shape
285 matching_field_def = None
286 for field_def in field_definitions:
287 if property_shape:
288 # If property has a shape, match it with the field definition's subjectShape
289 if field_def.get("subjectShape") == property_shape:
290 matching_field_def = field_def
291 break
292 else:
293 # If no shape specified, use the first field definition without a shape requirement
294 if not field_def.get("subjectShape"):
295 matching_field_def = field_def
296 break
298 # If no matching field definition found, use the first one (default behavior)
299 if not matching_field_def and field_definitions:
300 matching_field_def = field_definitions[0]
302 ordered_by = (
303 matching_field_def.get("orderedBy") if matching_field_def else None
304 )
306 if ordered_by:
307 process_ordered_properties(
308 editor, entity_uri, predicate, values, default_graph_uri, ordered_by
309 )
310 else:
311 # Handle unordered properties
312 process_unordered_properties(
313 editor, entity_uri, predicate, values, default_graph_uri, matching_field_def
314 )
315 else:
316 entity_type = structured_data.get("entity_type")
317 properties = structured_data.get("properties", {})
319 entity_uri = generate_unique_uri(entity_type)
320 editor.import_entity(entity_uri)
322 import_referenced_entities(editor, structured_data)
324 editor.preexisting_finished()
326 default_graph_uri = (
327 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None
328 )
330 editor.create(
331 entity_uri,
332 RDF.type,
333 URIRef(entity_type),
334 default_graph_uri,
335 )
337 for predicate, values in properties.items():
338 for value_dict in values:
339 if value_dict["type"] == "uri":
340 editor.create(
341 entity_uri,
342 URIRef(predicate),
343 URIRef(value_dict["value"]),
344 default_graph_uri,
345 )
346 elif value_dict["type"] == "literal":
347 datatype = (
348 URIRef(value_dict["datatype"])
349 if "datatype" in value_dict
350 else XSD.string
351 )
352 editor.create(
353 entity_uri,
354 URIRef(predicate),
355 Literal(value_dict["value"], datatype=datatype),
356 default_graph_uri,
357 )
359 try:
360 editor.save()
361 response = jsonify(
362 {
363 "status": "success",
364 "redirect_url": url_for("entity.about", subject=str(entity_uri)),
365 }
366 )
367 flash(gettext("Entity created successfully"), "success")
368 return response, 200
369 except Exception as e:
370 error_message = gettext(
371 "An error occurred while creating the entity: %(error)s", error=str(e)
372 )
373 return jsonify({"status": "error", "errors": [error_message]}), 500
375 return render_template(
376 "create_entity.jinja",
377 form_fields=form_fields,
378 datatype_options=datatype_options,
379 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"],
380 dataset_db_text_index_enabled=current_app.config[
381 "DATASET_DB_TEXT_INDEX_ENABLED"
382 ],
383 default_primary_source=default_primary_source,
384 shacl=bool(get_form_fields()),
385 entity_class_shape_pairs=entity_class_shape_pairs
386 )
389def create_nested_entity(
390 editor: Editor, entity_uri, entity_data, graph_uri=None
391):
392 form_fields = get_form_fields()
394 editor.create(
395 entity_uri,
396 URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
397 URIRef(entity_data["entity_type"]),
398 graph_uri,
399 )
401 entity_type = entity_data.get("entity_type")
402 entity_shape = entity_data.get("entity_shape")
403 properties = entity_data.get("properties", {})
405 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
407 if not matching_key:
408 return
410 # Add other properties
411 for predicate, values in properties.items():
412 if not isinstance(values, list):
413 values = [values]
414 field_definitions = form_fields[matching_key].get(predicate, [])
416 for value in values:
417 if isinstance(value, dict) and "entity_type" in value:
418 if "intermediateRelation" in value:
419 intermediate_uri = generate_unique_uri(
420 value["intermediateRelation"]["class"]
421 )
422 target_uri = generate_unique_uri(value["entity_type"])
423 editor.create(
424 entity_uri, URIRef(predicate), intermediate_uri, graph_uri
425 )
426 editor.create(
427 intermediate_uri,
428 URIRef(value["intermediateRelation"]["property"]),
429 target_uri,
430 graph_uri,
431 )
432 create_nested_entity(
433 editor, target_uri, value, graph_uri
434 )
435 else:
436 # Handle nested entities
437 nested_uri = generate_unique_uri(value["entity_type"])
438 editor.create(entity_uri, URIRef(predicate), nested_uri, graph_uri)
439 create_nested_entity(
440 editor, nested_uri, value, graph_uri
441 )
442 elif isinstance(value, dict) and value.get("is_existing_entity", False):
443 existing_entity_uri = value.get("entity_uri")
444 if existing_entity_uri:
445 editor.create(entity_uri, URIRef(predicate), URIRef(existing_entity_uri), graph_uri)
446 else:
447 # Handle simple properties - check if it's a URI or literal
448 if validators.url(str(value)):
449 object_value = URIRef(value)
450 else:
451 datatype = XSD.string # Default to string if not specified
452 datatype_uris = []
453 if field_definitions:
454 datatype_uris = field_definitions[0].get("datatypes", [])
455 datatype = determine_datatype(value, datatype_uris)
456 object_value = Literal(value, datatype=datatype)
457 editor.create(entity_uri, URIRef(predicate), object_value, graph_uri)
460def process_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri, matching_field_def):
461 """
462 Process a single entity value, handling nested entities, existing entity references, and simple literals.
464 Args:
465 editor: Editor instance for RDF operations
466 entity_uri: URI of the parent entity
467 predicate: Predicate URI
468 value: Value to process (dict or primitive)
469 default_graph_uri: Default graph URI for quad stores
470 matching_field_def: Field definition for datatype validation
472 Returns:
473 URIRef: The URI of the created/referenced entity
474 """
475 if isinstance(value, dict) and "entity_type" in value:
476 nested_uri = generate_unique_uri(value["entity_type"])
477 editor.create(
478 entity_uri,
479 URIRef(predicate),
480 nested_uri,
481 default_graph_uri,
482 )
483 create_nested_entity(
484 editor,
485 nested_uri,
486 value,
487 default_graph_uri
488 )
489 return nested_uri
490 elif isinstance(value, dict) and value.get("is_existing_entity", False):
491 entity_ref_uri = value.get("entity_uri")
492 if entity_ref_uri:
493 object_value = URIRef(entity_ref_uri)
494 editor.create(
495 entity_uri,
496 URIRef(predicate),
497 object_value,
498 default_graph_uri,
499 )
500 return object_value
501 else:
502 raise ValueError("Missing entity_uri in existing entity reference")
503 else:
504 # Handle simple properties - check if it's a URI or literal
505 if validators.url(str(value)):
506 object_value = URIRef(value)
507 else:
508 datatype_uris = []
509 if matching_field_def:
510 datatype_uris = matching_field_def.get("datatypes", [])
511 datatype = determine_datatype(value, datatype_uris)
512 object_value = Literal(value, datatype=datatype)
513 editor.create(
514 entity_uri,
515 URIRef(predicate),
516 object_value,
517 default_graph_uri,
518 )
519 return object_value
522def process_ordered_entity_value(editor: Editor, entity_uri, predicate, value, default_graph_uri):
523 """
524 Process a single entity value for ordered properties.
526 Args:
527 editor: Editor instance for RDF operations
528 entity_uri: URI of the parent entity
529 predicate: Predicate URI
530 value: Value to process (dict)
531 default_graph_uri: Default graph URI for quad stores
533 Returns:
534 URIRef: The URI of the created/referenced entity
535 """
536 if isinstance(value, dict) and "entity_type" in value:
537 nested_uri = generate_unique_uri(value["entity_type"])
538 editor.create(
539 entity_uri,
540 URIRef(predicate),
541 nested_uri,
542 default_graph_uri,
543 )
544 create_nested_entity(
545 editor,
546 nested_uri,
547 value,
548 default_graph_uri
549 )
550 return nested_uri
551 elif isinstance(value, dict) and value.get("is_existing_entity", False):
552 # If it's a direct URI value (reference to existing entity)
553 nested_uri = URIRef(value)
554 editor.create(
555 entity_uri,
556 URIRef(predicate),
557 nested_uri,
558 default_graph_uri,
559 )
560 return nested_uri
561 else:
562 raise ValueError("Unexpected value type for ordered property")
565def process_ordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, ordered_by):
566 """
567 Process ordered properties by grouping values by shape and maintaining order.
569 Args:
570 editor: Editor instance for RDF operations
571 entity_uri: URI of the parent entity
572 predicate: Predicate URI
573 values: List of values to process
574 default_graph_uri: Default graph URI for quad stores
575 ordered_by: URI of the ordering property
576 """
577 values_by_shape = {}
578 for value in values:
579 shape = value.get("entity_shape")
580 if not shape:
581 shape = "default_shape"
582 if shape not in values_by_shape:
583 values_by_shape[shape] = []
584 values_by_shape[shape].append(value)
586 for shape, shape_values in values_by_shape.items():
587 previous_entity = None
588 for value in shape_values:
589 nested_uri = process_ordered_entity_value(
590 editor, entity_uri, predicate, value, default_graph_uri
591 )
593 if previous_entity:
594 editor.create(
595 previous_entity,
596 URIRef(ordered_by),
597 nested_uri,
598 default_graph_uri,
599 )
600 previous_entity = nested_uri
603def process_unordered_properties(editor: Editor, entity_uri, predicate, values, default_graph_uri, matching_field_def):
604 """
605 Process unordered properties.
607 Args:
608 editor: Editor instance for RDF operations
609 entity_uri: URI of the parent entity
610 predicate: Predicate URI
611 values: List of values to process
612 default_graph_uri: Default graph URI for quad stores
613 matching_field_def: Field definition for datatype validation
614 """
615 for value in values:
616 process_entity_value(
617 editor, entity_uri, predicate, value, default_graph_uri, matching_field_def
618 )
621def determine_datatype(value, datatype_uris):
622 for datatype_uri in datatype_uris:
623 validation_func = next(
624 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None
625 )
626 if validation_func and validation_func(value):
627 return URIRef(datatype_uri)
628 # If none match, default to XSD.string
629 return XSD.string
632def validate_entity_data(structured_data):
633 """
634 Validates entity data against form field definitions, considering shape matching.
636 Args:
637 structured_data (dict): Data to validate containing entity_type and properties
639 Returns:
640 list: List of validation error messages, empty if validation passes
641 """
642 custom_filter = get_custom_filter()
643 form_fields = get_form_fields()
645 errors = []
646 entity_type = structured_data.get("entity_type")
647 entity_shape = structured_data.get("entity_shape")
649 if not entity_type:
650 errors.append(gettext("Entity type is required"))
651 return errors
653 entity_key = find_matching_form_field(entity_type, entity_shape, form_fields)
655 if not entity_key:
656 errors.append(f"No form fields found for entity type: {entity_type}" +
657 (f" and shape: {entity_shape}" if entity_shape else ""))
658 return errors
660 entity_fields = form_fields[entity_key]
661 properties = structured_data.get("properties", {})
663 for prop_uri, prop_values in properties.items():
664 if URIRef(prop_uri) == RDF.type:
665 continue
667 field_definitions = entity_fields.get(prop_uri)
668 if not field_definitions:
669 errors.append(
670 gettext(
671 "Unknown property %(prop_uri)s for entity type %(entity_type)s",
672 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
673 entity_type=custom_filter.human_readable_class(entity_key),
674 )
675 )
676 continue
678 if not isinstance(prop_values, list):
679 prop_values = [prop_values]
681 property_shape = None
682 if prop_values and isinstance(prop_values[0], dict):
683 property_shape = prop_values[0].get("shape")
685 matching_field_def = None
686 for field_def in field_definitions:
687 if property_shape:
688 if field_def.get("subjectShape") == property_shape:
689 matching_field_def = field_def
690 break
691 else:
692 if not field_def.get("subjectShape"):
693 matching_field_def = field_def
694 break
696 if not matching_field_def and field_definitions:
697 matching_field_def = field_definitions[0]
699 if matching_field_def:
700 min_count = matching_field_def.get("min", 0)
701 max_count = matching_field_def.get("max", None)
702 value_count = len(prop_values)
704 if value_count < min_count:
705 value = gettext("values") if min_count > 1 else gettext("value")
706 errors.append(
707 gettext(
708 "Property %(prop_uri)s requires at least %(min_count)d %(value)s",
709 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
710 min_count=min_count,
711 value=value,
712 )
713 )
714 if max_count is not None and value_count > max_count:
715 value = gettext("values") if max_count > 1 else gettext("value")
716 errors.append(
717 gettext(
718 "Property %(prop_uri)s allows at most %(max_count)d %(value)s",
719 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
720 max_count=max_count,
721 value=value,
722 )
723 )
725 mandatory_values = matching_field_def.get("mandatory_values", [])
726 for mandatory_value in mandatory_values:
727 if mandatory_value not in prop_values:
728 errors.append(
729 gettext(
730 "Property %(prop_uri)s requires the value %(mandatory_value)s",
731 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
732 mandatory_value=mandatory_value,
733 )
734 )
736 for value in prop_values:
737 if isinstance(value, dict) and "entity_type" in value:
738 nested_errors = validate_entity_data(value)
739 errors.extend(nested_errors)
740 else:
741 datatypes = matching_field_def.get("datatypes", [])
742 if datatypes:
743 is_valid_datatype = False
744 for dtype in datatypes:
745 validation_func = next(
746 (
747 d[1]
748 for d in DATATYPE_MAPPING
749 if d[0] == URIRef(dtype)
750 ),
751 None,
752 )
753 if validation_func and validation_func(value):
754 is_valid_datatype = True
755 break
756 if not is_valid_datatype:
757 expected_types = ", ".join(
758 [
759 custom_filter.human_readable_predicate(dtype, entity_key)
760 for dtype in datatypes
761 ]
762 )
763 errors.append(
764 gettext(
765 'Value "%(value)s" for property %(prop_uri)s is not of expected type %(expected_types)s',
766 value=value,
767 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
768 expected_types=expected_types
769 )
770 )
772 optional_values = matching_field_def.get("optionalValues", [])
773 if optional_values and value not in optional_values:
774 acceptable_values = ", ".join(
775 [
776 custom_filter.human_readable_predicate(val, entity_key)
777 for val in optional_values
778 ]
779 )
780 errors.append(
781 gettext(
782 'Value "%(value)s" is not permitted for property %(prop_uri)s. Acceptable values are: %(acceptable_values)s',
783 value=value,
784 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
785 acceptable_values=acceptable_values
786 )
787 )
789 # In the RDF model, a property with zero values is equivalent to the property being absent,
790 # as a triple requires a subject, predicate, and object. Therefore, this section checks for
791 # properties defined in the schema that are completely absent from the input data but are
792 # required (min_count > 0). This complements the cardinality check above, which only
793 # validates properties that are present in the data.
794 # Check for missing required properties
795 for prop_uri, field_definitions in entity_fields.items():
796 if prop_uri not in properties:
797 for field_def in field_definitions:
798 min_count = field_def.get("min", 0)
799 if min_count > 0:
800 value = gettext("values") if min_count > 1 else gettext("value")
801 errors.append(
802 gettext(
803 "Missing required property: %(prop_uri)s requires at least %(min_count)d %(value)s",
804 prop_uri=custom_filter.human_readable_predicate(prop_uri, entity_key),
805 min_count=min_count,
806 value=value,
807 )
808 )
809 break # Only need to report once per property
811 return errors
814@entity_bp.route("/entity-history/<path:entity_uri>")
815@login_required
816def entity_history(entity_uri):
817 """
818 Display the history of changes for an entity.
820 Args:
821 entity_uri: URI of the entity
822 """
823 custom_filter = get_custom_filter()
824 change_tracking_config = get_change_tracking_config()
826 agnostic_entity = AgnosticEntity(
827 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True
828 )
829 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
831 sorted_metadata = sorted(
832 provenance[entity_uri].items(),
833 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]),
834 )
835 sorted_timestamps = [
836 convert_to_datetime(meta["generatedAtTime"], stringify=True)
837 for _, meta in sorted_metadata
838 ]
840 # Get correct context for entity label
841 latest_metadata = sorted_metadata[-1][1] if sorted_metadata else None
842 is_latest_deletion = (
843 latest_metadata
844 and "invalidatedAtTime" in latest_metadata
845 and latest_metadata["invalidatedAtTime"]
846 )
847 if is_latest_deletion and len(sorted_timestamps) > 1:
848 context_snapshot = history[entity_uri][sorted_timestamps[-2]]
849 else:
850 context_snapshot = history[entity_uri][sorted_timestamps[-1]]
852 entity_classes = [str(triple[2]) for triple in context_snapshot.triples((URIRef(entity_uri), RDF.type, None))]
853 highest_priority_class = get_highest_priority_class(entity_classes)
855 snapshot_entity_shape = determine_shape_for_entity_triples(
856 list(context_snapshot.triples((URIRef(entity_uri), None, None)))
857 )
859 # Generate timeline events
860 events = []
861 for i, (snapshot_uri, metadata) in enumerate(sorted_metadata):
862 date = convert_to_datetime(metadata["generatedAtTime"])
863 snapshot_timestamp_str = convert_to_datetime(
864 metadata["generatedAtTime"], stringify=True
865 )
866 snapshot_graph = history[entity_uri][snapshot_timestamp_str]
868 responsible_agent = custom_filter.format_agent_reference(
869 metadata["wasAttributedTo"]
870 )
871 primary_source = custom_filter.format_source_reference(
872 metadata["hadPrimarySource"]
873 )
875 description = _format_snapshot_description(
876 metadata,
877 entity_uri,
878 highest_priority_class,
879 context_snapshot,
880 history,
881 sorted_timestamps,
882 i,
883 custom_filter,
884 )
885 modifications = metadata.get("hasUpdateQuery", "")
886 modification_text = ""
887 if modifications:
888 parsed_modifications = parse_sparql_update(modifications)
889 modification_text = generate_modification_text(
890 parsed_modifications,
891 highest_priority_class,
892 snapshot_entity_shape,
893 history=history,
894 entity_uri=entity_uri,
895 current_snapshot=snapshot_graph,
896 current_snapshot_timestamp=snapshot_timestamp_str,
897 custom_filter=custom_filter,
898 )
900 # Check if this version can be restored (not the latest version and there are multiple versions)
901 can_restore = len(sorted_metadata) > 1 and i + 1 < len(sorted_metadata)
902 restore_button = ""
903 if can_restore:
904 restore_button = f"""
905 <form action='/restore-version/{entity_uri}/{metadata["generatedAtTime"]}' method='post' class='d-inline restore-form'>
906 <button type='submit' class='btn btn-success restore-btn'>
907 <i class='bi bi-arrow-counterclockwise me-1'></i>{gettext('Restore')}
908 </button>
909 </form>
910 """
912 event = {
913 "start_date": {
914 "year": date.year,
915 "month": date.month,
916 "day": date.day,
917 "hour": date.hour,
918 "minute": date.minute,
919 "second": date.second,
920 },
921 "text": {
922 "headline": gettext("Snapshot") + " " + str(i + 1),
923 "text": f"""
924 <p><strong>{gettext('Responsible agent')}:</strong> {responsible_agent}</p>
925 <p><strong>{gettext('Primary source')}:</strong> {primary_source}</p>
926 <p><strong>{gettext('Description')}:</strong> {description}</p>
927 <div class="modifications mb-3">
928 {modification_text}
929 </div>
930 <div class="d-flex gap-2 mt-2">
931 <a href='/entity-version/{entity_uri}/{metadata["generatedAtTime"]}' class='btn btn-outline-primary view-version' target='_self'>{gettext('View version')}</a>
932 {restore_button}
933 </div>
934 """,
935 },
936 "autolink": False,
937 }
939 if i + 1 < len(sorted_metadata):
940 next_date = convert_to_datetime(
941 sorted_metadata[i + 1][1]["generatedAtTime"]
942 )
943 event["end_date"] = {
944 "year": next_date.year,
945 "month": next_date.month,
946 "day": next_date.day,
947 "hour": next_date.hour,
948 "minute": next_date.minute,
949 "second": next_date.second,
950 }
952 events.append(event)
954 entity_label = custom_filter.human_readable_entity(
955 entity_uri, (highest_priority_class, snapshot_entity_shape), context_snapshot
956 )
958 timeline_data = {
959 "entityUri": entity_uri,
960 "entityLabel": entity_label,
961 "entityClasses": list(entity_classes),
962 "entityShape": snapshot_entity_shape,
963 "events": events,
964 }
966 return render_template("entity/history.jinja", timeline_data=timeline_data)
969def _format_snapshot_description(
970 metadata: dict,
971 entity_uri: str,
972 highest_priority_class: str,
973 context_snapshot: Graph,
974 history: dict,
975 sorted_timestamps: list[str],
976 current_index: int,
977 custom_filter: Filter,
978) -> Tuple[str, bool]:
979 """
980 Formats the snapshot description and determines if it's a merge snapshot.
982 Args:
983 metadata: The snapshot metadata dictionary.
984 entity_uri: The URI of the main entity.
985 highest_priority_class: The highest priority class for the entity.
986 context_snapshot: The graph snapshot for context.
987 history: The history dictionary containing snapshots.
988 sorted_timestamps: Sorted list of snapshot timestamps.
989 current_index: The index of the current snapshot in sorted_timestamps.
990 custom_filter: The custom filter instance for formatting.
992 Returns:
993 The formatted description string.
994 """
995 description = metadata.get("description", "")
996 is_merge_snapshot = False
997 was_derived_from = metadata.get('wasDerivedFrom')
998 if isinstance(was_derived_from, list) and len(was_derived_from) > 1:
999 is_merge_snapshot = True
1001 if is_merge_snapshot:
1002 # Regex to find URI after "merged with", potentially enclosed in single quotes or none
1003 match = re.search(r"merged with ['‘]?([^'’<>\s]+)['’]?", description)
1004 if match:
1005 potential_merged_uri = match.group(1)
1006 if validators.url(potential_merged_uri):
1007 merged_entity_uri_from_desc = potential_merged_uri
1008 merged_entity_label = None
1009 if current_index > 0:
1010 previous_snapshot_timestamp = sorted_timestamps[current_index - 1]
1011 previous_snapshot_graph = history.get(entity_uri, {}).get(previous_snapshot_timestamp)
1012 if previous_snapshot_graph:
1013 raw_merged_entity_classes = [
1014 str(o)
1015 for s, p, o in previous_snapshot_graph.triples(
1016 (URIRef(merged_entity_uri_from_desc), RDF.type, None)
1017 )
1018 ]
1019 highest_priority_merged_class = get_highest_priority_class(
1020 raw_merged_entity_classes
1021 ) if raw_merged_entity_classes else None
1023 shape = determine_shape_for_classes(raw_merged_entity_classes)
1024 merged_entity_label = custom_filter.human_readable_entity(
1025 merged_entity_uri_from_desc,
1026 (highest_priority_merged_class, shape),
1027 previous_snapshot_graph,
1028 )
1029 if (
1030 merged_entity_label
1031 and merged_entity_label != merged_entity_uri_from_desc
1032 ):
1033 description = description.replace(
1034 match.group(0), f"merged with '{merged_entity_label}'"
1035 )
1037 shape = determine_shape_for_classes([highest_priority_class])
1038 entity_label_for_desc = custom_filter.human_readable_entity(
1039 entity_uri, (highest_priority_class, shape), context_snapshot
1040 )
1041 if entity_label_for_desc and entity_label_for_desc != entity_uri:
1042 description = description.replace(f"'{entity_uri}'", f"'{entity_label_for_desc}'")
1044 return description
1047@entity_bp.route("/entity-version/<path:entity_uri>/<timestamp>")
1048@login_required
1049def entity_version(entity_uri, timestamp):
1050 """
1051 Display a specific version of an entity.
1053 Args:
1054 entity_uri: URI of the entity
1055 timestamp: Timestamp of the version to display
1056 """
1057 custom_filter = get_custom_filter()
1058 change_tracking_config = get_change_tracking_config()
1060 try:
1061 timestamp_dt = datetime.fromisoformat(timestamp)
1062 except ValueError:
1063 provenance_sparql = get_provenance_sparql()
1064 query_timestamp = f"""
1065 SELECT ?generation_time
1066 WHERE {{
1067 <{entity_uri}/prov/se/{timestamp}> <http://www.w3.org/ns/prov#generatedAtTime> ?generation_time.
1068 }}
1069 """
1070 provenance_sparql.setQuery(query_timestamp)
1071 provenance_sparql.setReturnFormat(JSON)
1072 try:
1073 generation_time = provenance_sparql.queryAndConvert()["results"][
1074 "bindings"
1075 ][0]["generation_time"]["value"]
1076 except IndexError:
1077 abort(404)
1078 timestamp = generation_time
1079 timestamp_dt = datetime.fromisoformat(generation_time)
1081 agnostic_entity = AgnosticEntity(
1082 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True
1083 )
1084 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
1085 main_entity_history = history.get(entity_uri, {})
1086 sorted_timestamps = sorted(
1087 main_entity_history.keys(), key=lambda t: convert_to_datetime(t)
1088 )
1090 if not sorted_timestamps:
1091 abort(404)
1093 closest_timestamp = min(
1094 sorted_timestamps,
1095 key=lambda t: abs(
1096 convert_to_datetime(t).astimezone() - timestamp_dt.astimezone()
1097 ),
1098 )
1100 version = main_entity_history[closest_timestamp]
1101 triples = list(version.triples((URIRef(entity_uri), None, None)))
1103 entity_metadata = provenance.get(entity_uri, {})
1104 closest_metadata = None
1105 min_time_diff = None
1107 latest_timestamp = max(sorted_timestamps)
1108 latest_metadata = None
1110 for se_uri, meta in entity_metadata.items():
1111 meta_time = convert_to_datetime(meta["generatedAtTime"])
1112 time_diff = abs((meta_time - timestamp_dt).total_seconds())
1114 if closest_metadata is None or time_diff < min_time_diff:
1115 closest_metadata = meta
1116 min_time_diff = time_diff
1118 if meta["generatedAtTime"] == latest_timestamp:
1119 latest_metadata = meta
1121 if closest_metadata is None or latest_metadata is None:
1122 abort(404)
1124 is_deletion_snapshot = (
1125 closest_timestamp == latest_timestamp
1126 and "invalidatedAtTime" in latest_metadata
1127 and latest_metadata["invalidatedAtTime"]
1128 ) or len(triples) == 0
1130 context_version = version
1131 if is_deletion_snapshot and len(sorted_timestamps) > 1:
1132 current_index = sorted_timestamps.index(closest_timestamp)
1133 if current_index > 0:
1134 context_version = main_entity_history[sorted_timestamps[current_index - 1]]
1136 if is_deletion_snapshot and len(sorted_timestamps) > 1:
1137 subject_classes = [
1138 o
1139 for _, _, o in context_version.triples((URIRef(entity_uri), RDF.type, None))
1140 ]
1141 else:
1142 subject_classes = [
1143 o for _, _, o in version.triples((URIRef(entity_uri), RDF.type, None))
1144 ]
1146 highest_priority_class = get_highest_priority_class(subject_classes)
1148 entity_shape = determine_shape_for_entity_triples(
1149 list(context_version.triples((URIRef(entity_uri), None, None)))
1150 )
1152 _, _, _, _, _, valid_predicates = get_valid_predicates(triples, highest_priority_class=highest_priority_class)
1154 grouped_triples, relevant_properties = get_grouped_triples(
1155 entity_uri,
1156 triples,
1157 valid_predicates,
1158 historical_snapshot=context_version,
1159 highest_priority_class=highest_priority_class,
1160 highest_priority_shape=entity_shape
1161 )
1163 snapshot_times = [
1164 convert_to_datetime(meta["generatedAtTime"])
1165 for meta in entity_metadata.values()
1166 ]
1167 snapshot_times = sorted(set(snapshot_times))
1168 version_number = snapshot_times.index(timestamp_dt) + 1
1170 next_snapshot_timestamp = None
1171 prev_snapshot_timestamp = None
1173 for snap_time in snapshot_times:
1174 if snap_time > timestamp_dt:
1175 next_snapshot_timestamp = snap_time.isoformat()
1176 break
1178 for snap_time in reversed(snapshot_times):
1179 if snap_time < timestamp_dt:
1180 prev_snapshot_timestamp = snap_time.isoformat()
1181 break
1183 modifications = ""
1184 if closest_metadata.get("hasUpdateQuery"):
1185 sparql_query = closest_metadata["hasUpdateQuery"]
1186 parsed_modifications = parse_sparql_update(sparql_query)
1187 modifications = generate_modification_text(
1188 parsed_modifications,
1189 highest_priority_class,
1190 entity_shape,
1191 history,
1192 entity_uri,
1193 context_version,
1194 closest_timestamp,
1195 custom_filter,
1196 )
1198 try:
1199 current_index = sorted_timestamps.index(closest_timestamp)
1200 except ValueError:
1201 current_index = -1
1203 if closest_metadata.get("description"):
1204 formatted_description = _format_snapshot_description(
1205 closest_metadata,
1206 entity_uri,
1207 highest_priority_class,
1208 context_version,
1209 history,
1210 sorted_timestamps,
1211 current_index,
1212 custom_filter,
1213 )
1214 closest_metadata["description"] = formatted_description
1216 closest_timestamp = closest_metadata["generatedAtTime"]
1218 return render_template(
1219 "entity/version.jinja",
1220 subject=entity_uri,
1221 entity_type=highest_priority_class,
1222 entity_shape=entity_shape,
1223 metadata={closest_timestamp: closest_metadata},
1224 timestamp=closest_timestamp,
1225 next_snapshot_timestamp=next_snapshot_timestamp,
1226 prev_snapshot_timestamp=prev_snapshot_timestamp,
1227 modifications=modifications,
1228 grouped_triples=grouped_triples,
1229 version_number=version_number,
1230 version=context_version,
1231 )
1234@entity_bp.route("/restore-version/<path:entity_uri>/<timestamp>", methods=["POST"])
1235@login_required
1236def restore_version(entity_uri, timestamp):
1237 """
1238 Restore an entity to a previous version.
1240 Args:
1241 entity_uri: URI of the entity to restore
1242 timestamp: Timestamp of the version to restore to
1243 """
1244 timestamp = convert_to_datetime(timestamp, stringify=True)
1245 change_tracking_config = get_change_tracking_config()
1247 # Get entity history
1248 agnostic_entity = AgnosticEntity(
1249 res=entity_uri, config=change_tracking_config, include_related_objects=True, include_merged_entities=True, include_reverse_relations=True
1250 )
1251 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
1253 historical_graph = history.get(entity_uri, {}).get(timestamp)
1254 if historical_graph is None:
1255 abort(404)
1257 current_graph = fetch_current_state_with_related_entities(provenance)
1259 is_deleted = len(list(current_graph.triples((URIRef(entity_uri), None, None)))) == 0
1261 triples_or_quads_to_delete, triples_or_quads_to_add = compute_graph_differences(
1262 current_graph, historical_graph
1263 )
1265 # Get all entities that need restoration
1266 entities_to_restore = get_entities_to_restore(
1267 triples_or_quads_to_delete, triples_or_quads_to_add, entity_uri
1268 )
1270 # Prepare snapshot information for all entities
1271 entity_snapshots = prepare_entity_snapshots(
1272 entities_to_restore, provenance, timestamp
1273 )
1275 # Create editor instance
1276 editor = Editor(
1277 get_dataset_endpoint(),
1278 get_provenance_endpoint(),
1279 current_app.config["COUNTER_HANDLER"],
1280 URIRef(get_responsible_agent_uri(current_user.orcid)),
1281 None if is_deleted else entity_snapshots[entity_uri]["source"],
1282 current_app.config["DATASET_GENERATION_TIME"],
1283 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"],
1284 )
1286 # Import current state into editor
1287 if get_dataset_is_quadstore():
1288 for quad in current_graph.quads():
1289 editor.g_set.add(quad)
1290 else:
1291 for triple in current_graph:
1292 editor.g_set.add(triple)
1294 editor.preexisting_finished()
1296 # Apply deletions
1297 for item in triples_or_quads_to_delete:
1298 if len(item) == 4:
1299 editor.delete(item[0], item[1], item[2], item[3])
1300 else:
1301 editor.delete(item[0], item[1], item[2])
1303 subject = str(item[0])
1304 if subject in entity_snapshots:
1305 entity_info = entity_snapshots[subject]
1306 if entity_info["needs_restore"]:
1307 editor.g_set.mark_as_restored(URIRef(subject))
1308 editor.g_set.entity_index[URIRef(subject)]["restoration_source"] = (
1309 entity_info["source"]
1310 )
1312 # Apply additions
1313 for item in triples_or_quads_to_add:
1314 if len(item) == 4:
1315 editor.create(item[0], item[1], item[2], item[3])
1316 else:
1317 editor.create(item[0], item[1], item[2])
1319 subject = str(item[0])
1320 if subject in entity_snapshots:
1321 entity_info = entity_snapshots[subject]
1322 if entity_info["needs_restore"]:
1323 editor.g_set.mark_as_restored(URIRef(subject))
1324 editor.g_set.entity_index[URIRef(subject)]["source"] = entity_info[
1325 "source"
1326 ]
1328 # Handle main entity restoration if needed
1329 if is_deleted and entity_uri in entity_snapshots:
1330 editor.g_set.mark_as_restored(URIRef(entity_uri))
1331 source = entity_snapshots[entity_uri]["source"]
1332 editor.g_set.entity_index[URIRef(entity_uri)]["source"] = source
1334 try:
1335 editor.save()
1336 flash(gettext("Version restored successfully"), "success")
1337 except Exception as e:
1338 flash(
1339 gettext(
1340 "An error occurred while restoring the version: %(error)s", error=str(e)
1341 ),
1342 "error",
1343 )
1345 return redirect(url_for("entity.about", subject=entity_uri))
1348def compute_graph_differences(
1349 current_graph: Graph | ConjunctiveGraph, historical_graph: Graph | ConjunctiveGraph
1350):
1351 if get_dataset_is_quadstore():
1352 current_data = set(current_graph.quads())
1353 historical_data = set(historical_graph.quads())
1354 else:
1355 current_data = set(current_graph.triples((None, None, None)))
1356 historical_data = set(historical_graph.triples((None, None, None)))
1357 triples_or_quads_to_delete = current_data - historical_data
1358 triples_or_quads_to_add = historical_data - current_data
1360 return triples_or_quads_to_delete, triples_or_quads_to_add
1363def get_entities_to_restore(
1364 triples_or_quads_to_delete: set, triples_or_quads_to_add: set, main_entity_uri: str
1365) -> set:
1366 """
1367 Identify all entities that need to be restored based on the graph differences.
1369 Args:
1370 triples_or_quads_to_delete: Set of triples/quads to be deleted
1371 triples_or_quads_to_add: Set of triples/quads to be added
1372 main_entity_uri: URI of the main entity being restored
1374 Returns:
1375 Set of entity URIs that need to be restored
1376 """
1377 entities_to_restore = {main_entity_uri}
1379 for item in list(triples_or_quads_to_delete) + list(triples_or_quads_to_add):
1380 predicate = str(item[1])
1381 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
1382 continue
1384 subject = str(item[0])
1385 obj = str(item[2])
1386 for uri in [subject, obj]:
1387 if uri != main_entity_uri and validators.url(uri):
1388 entities_to_restore.add(uri)
1390 return entities_to_restore
1393def prepare_entity_snapshots(
1394 entities_to_restore: set, provenance: dict, target_time: str
1395) -> dict:
1396 """
1397 Prepare snapshot information for all entities that need to be restored.
1399 Args:
1400 entities_to_restore: Set of entity URIs to process
1401 provenance: Dictionary containing provenance data for all entities
1402 target_time: Target restoration time
1404 Returns:
1405 Dictionary mapping entity URIs to their restoration information
1406 """
1407 entity_snapshots = {}
1409 for entity_uri in entities_to_restore:
1410 if entity_uri not in provenance:
1411 continue
1413 # Find the appropriate source snapshot
1414 source_snapshot = find_appropriate_snapshot(provenance[entity_uri], target_time)
1415 if not source_snapshot:
1416 continue
1418 # Check if entity is currently deleted by examining its latest snapshot
1419 sorted_snapshots = sorted(
1420 provenance[entity_uri].items(),
1421 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]),
1422 )
1423 latest_snapshot = sorted_snapshots[-1][1]
1424 is_deleted = (
1425 latest_snapshot.get("invalidatedAtTime")
1426 and latest_snapshot["generatedAtTime"]
1427 == latest_snapshot["invalidatedAtTime"]
1428 )
1430 entity_snapshots[entity_uri] = {
1431 "source": source_snapshot,
1432 "needs_restore": is_deleted,
1433 }
1435 return entity_snapshots
1438def find_appropriate_snapshot(provenance_data: dict, target_time: str) -> Optional[str]:
1439 """
1440 Find the most appropriate snapshot to use as a source for restoration.
1442 Args:
1443 provenance_data: Dictionary of snapshots and their metadata for an entity
1444 target_time: The target restoration time as ISO format string
1446 Returns:
1447 The URI of the most appropriate snapshot, or None if no suitable snapshot is found
1448 """
1449 target_datetime = convert_to_datetime(target_time)
1451 # Convert all generation times to datetime for comparison
1452 valid_snapshots = []
1453 for snapshot_uri, metadata in provenance_data.items():
1454 generation_time = convert_to_datetime(metadata["generatedAtTime"])
1456 # Skip deletion snapshots (where generation time equals invalidation time)
1457 if (
1458 metadata.get("invalidatedAtTime")
1459 and metadata["generatedAtTime"] == metadata["invalidatedAtTime"]
1460 ):
1461 continue
1463 # Only consider snapshots up to our target time
1464 if generation_time <= target_datetime:
1465 valid_snapshots.append((generation_time, snapshot_uri))
1467 if not valid_snapshots:
1468 return None
1470 # Sort by generation time and take the most recent one
1471 valid_snapshots.sort(key=lambda x: x[0])
1472 return valid_snapshots[-1][1]
1475def determine_object_class_and_shape(object_value: str, relevant_snapshot: Graph) -> tuple[Optional[str], Optional[str]]:
1476 """
1477 Determine the class and shape for an object value from a graph snapshot.
1479 Args:
1480 object_value: The object value (URI or literal)
1481 relevant_snapshot: Graph snapshot to query for object information
1483 Returns:
1484 Tuple of (object_class, object_shape_uri) or (None, None) if not determinable
1485 """
1486 if not validators.url(str(object_value)) or not relevant_snapshot:
1487 return None, None
1489 object_triples = list(relevant_snapshot.triples((URIRef(object_value), None, None)))
1490 if not object_triples:
1491 return None, None
1493 object_shape_uri = determine_shape_for_entity_triples(object_triples)
1494 object_classes = [
1495 str(o)
1496 for _, _, o in relevant_snapshot.triples(
1497 (URIRef(object_value), RDF.type, None)
1498 )
1499 ]
1500 object_class = get_highest_priority_class(object_classes) if object_classes else None
1502 return object_class, object_shape_uri
1505def generate_modification_text(
1506 modifications,
1507 highest_priority_class,
1508 entity_shape,
1509 history,
1510 entity_uri,
1511 current_snapshot,
1512 current_snapshot_timestamp,
1513 custom_filter: Filter,
1514) -> str:
1515 """
1516 Generate HTML text describing modifications to an entity, using display rules for property ordering.
1518 Args:
1519 modifications (dict): Dictionary of modifications from parse_sparql_update
1520 highest_priority_class (str): The highest priority class for the subject entity
1521 entity_shape (str): The shape for the subject entity
1522 history (dict): Historical snapshots dictionary
1523 entity_uri (str): URI of the entity being modified
1524 current_snapshot (Graph): Current entity snapshot
1525 current_snapshot_timestamp (str): Timestamp of current snapshot
1526 custom_filter (Filter): Filter instance for formatting
1528 Returns:
1529 str: HTML text describing the modifications
1530 """
1531 modification_text = "<p><strong>" + gettext("Modifications") + "</strong></p>"
1533 ordered_properties = get_property_order_from_rules(highest_priority_class, entity_shape)
1535 for mod_type, triples in modifications.items():
1536 modification_text += "<ul class='list-group mb-3'><p>"
1537 if mod_type == gettext("Additions"):
1538 modification_text += '<i class="bi bi-plus-circle-fill text-success"></i>'
1539 elif mod_type == gettext("Deletions"):
1540 modification_text += '<i class="bi bi-dash-circle-fill text-danger"></i>'
1541 modification_text += " <em>" + gettext(mod_type) + "</em></p>"
1543 object_shapes_cache = {}
1544 object_classes_cache = {}
1546 relevant_snapshot = None
1547 if (
1548 mod_type == gettext("Deletions")
1549 and history
1550 and entity_uri
1551 and current_snapshot_timestamp
1552 ):
1553 sorted_timestamps = sorted(history[entity_uri].keys())
1554 current_index = sorted_timestamps.index(current_snapshot_timestamp)
1555 if current_index > 0:
1556 relevant_snapshot = history[entity_uri][
1557 sorted_timestamps[current_index - 1]
1558 ]
1559 else:
1560 relevant_snapshot = current_snapshot
1562 if relevant_snapshot:
1563 for triple in triples:
1564 object_value = triple[2]
1565 object_class, object_shape = determine_object_class_and_shape(object_value, relevant_snapshot)
1566 object_classes_cache[str(object_value)] = object_class
1567 object_shapes_cache[str(object_value)] = object_shape
1569 predicate_shape_groups = {}
1570 predicate_ordering_cache = {}
1571 entity_position_cache = {}
1573 for triple in triples:
1574 predicate = str(triple[1])
1575 object_value = str(triple[2])
1576 object_shape_uri = object_shapes_cache.get(object_value)
1578 if predicate not in predicate_ordering_cache:
1579 predicate_ordering_cache[predicate] = get_predicate_ordering_info(predicate, highest_priority_class, entity_shape)
1581 order_property = predicate_ordering_cache[predicate]
1582 if order_property and validators.url(object_value) and relevant_snapshot:
1583 position_key = (object_value, predicate)
1584 if position_key not in entity_position_cache:
1585 entity_position_cache[position_key] = get_entity_position_in_sequence(
1586 object_value, entity_uri, predicate, order_property, relevant_snapshot
1587 )
1589 group_key = (predicate, object_shape_uri)
1590 if group_key not in predicate_shape_groups:
1591 predicate_shape_groups[group_key] = []
1592 predicate_shape_groups[group_key].append(triple)
1594 processed_predicates = set()
1596 def get_cached_position(triple, predicate_uri):
1597 object_value = str(triple[2])
1598 position_key = (object_value, predicate_uri)
1599 return entity_position_cache.get(position_key, float('inf'))
1601 for predicate in ordered_properties:
1602 shape_order = get_shape_order_from_display_rules(highest_priority_class, entity_shape, predicate)
1603 predicate_groups = []
1604 for group_key, group_triples in predicate_shape_groups.items():
1605 predicate_uri, object_shape_uri = group_key
1606 if predicate_uri == predicate:
1607 if object_shape_uri and object_shape_uri in shape_order:
1608 shape_priority = shape_order.index(object_shape_uri)
1609 else:
1610 # Objects without shapes or shapes not in display rules go at the end
1611 shape_priority = len(shape_order)
1613 predicate_groups.append((shape_priority, group_key, group_triples))
1615 predicate_groups.sort(key=lambda x: x[0])
1616 for _, group_key, group_triples in predicate_groups:
1617 processed_predicates.add(group_key)
1619 predicate_uri, _ = group_key
1620 order_property = predicate_ordering_cache.get(predicate_uri)
1622 if order_property and relevant_snapshot:
1623 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri))
1625 for triple in group_triples:
1626 modification_text += format_triple_modification(
1627 triple,
1628 highest_priority_class,
1629 entity_shape,
1630 object_shapes_cache,
1631 object_classes_cache,
1632 relevant_snapshot,
1633 custom_filter,
1634 subject_uri=entity_uri,
1635 predicate_ordering_cache=predicate_ordering_cache,
1636 entity_position_cache=entity_position_cache,
1637 )
1639 # Then handle any remaining predicate+shape groups not in the ordered list
1640 for group_key, group_triples in predicate_shape_groups.items():
1641 if group_key not in processed_predicates:
1642 # Sort remaining triples by their cached positions too
1643 predicate_uri, _ = group_key
1644 order_property = predicate_ordering_cache.get(predicate_uri)
1646 if order_property and relevant_snapshot:
1647 group_triples = sorted(group_triples, key=lambda t: get_cached_position(t, predicate_uri))
1649 for triple in group_triples:
1650 modification_text += format_triple_modification(
1651 triple,
1652 highest_priority_class,
1653 entity_shape,
1654 object_shapes_cache,
1655 object_classes_cache,
1656 relevant_snapshot,
1657 custom_filter,
1658 subject_uri=entity_uri,
1659 predicate_ordering_cache=predicate_ordering_cache,
1660 entity_position_cache=entity_position_cache,
1661 )
1663 modification_text += "</ul>"
1665 return modification_text
1668def format_triple_modification(
1669 triple: Tuple[URIRef, URIRef, URIRef|Literal],
1670 highest_priority_class: str,
1671 entity_shape: str,
1672 object_shapes_cache: dict,
1673 object_classes_cache: dict,
1674 relevant_snapshot: Optional[Graph],
1675 custom_filter: Filter,
1676 subject_uri: str = None,
1677 predicate_ordering_cache: Optional[dict] = None,
1678 entity_position_cache: Optional[dict] = None,
1679) -> str:
1680 """
1681 Format a single triple modification as HTML.
1683 Args:
1684 triple: The RDF triple being modified
1685 highest_priority_class: The highest priority class for the subject entity
1686 entity_shape: The shape for the subject entity
1687 object_shapes_cache: Pre-computed cache of object shapes
1688 object_classes_cache: Pre-computed cache of object classes
1689 relevant_snapshot: Graph snapshot for context
1690 custom_filter (Filter): Filter instance for formatting
1691 subject_uri: URI of the subject entity (for ordering queries)
1693 Returns:
1694 str: HTML text describing the modification
1695 """
1696 predicate = triple[1]
1697 object_value = triple[2]
1699 object_shape_uri = object_shapes_cache.get(str(object_value))
1701 predicate_label = custom_filter.human_readable_predicate(
1702 predicate, (highest_priority_class, entity_shape), object_shape_uri=object_shape_uri
1703 )
1705 object_class = object_classes_cache.get(str(object_value)) # Get from classes cache
1706 object_label = get_object_label(
1707 object_value,
1708 predicate,
1709 object_shape_uri,
1710 object_class,
1711 relevant_snapshot,
1712 custom_filter,
1713 subject_entity_key=(highest_priority_class, entity_shape),
1714 )
1716 order_info = ""
1717 if subject_uri and validators.url(str(object_value)):
1718 if predicate_ordering_cache and entity_position_cache:
1719 order_property = predicate_ordering_cache.get(str(predicate))
1720 if order_property:
1721 position_key = (str(object_value), str(predicate))
1722 position = entity_position_cache.get(position_key)
1723 if position is not None:
1724 order_info = f' <span class="order-position-badge">#{position}</span>'
1726 return f"""
1727 <li class='d-flex align-items-center'>
1728 <span class='flex-grow-1 d-flex flex-column justify-content-center ms-3 mb-2 w-100'>
1729 <strong>{predicate_label}{order_info}</strong>
1730 <span class="object-value word-wrap">{object_label}</span>
1731 </span>
1732 </li>"""
1735def get_object_label(
1736 object_value: str,
1737 predicate: str,
1738 object_shape_uri: Optional[str],
1739 object_class: Optional[str],
1740 snapshot: Optional[Graph],
1741 custom_filter: Filter,
1742 subject_entity_key: Optional[tuple] = None,
1743) -> str:
1744 """
1745 Get appropriate display label for an object value.
1747 Args:
1748 object_value: The value to get a label for
1749 predicate: The predicate URI
1750 object_shape_uri: Pre-computed shape URI for the object
1751 object_class: Pre-computed class for the object
1752 snapshot: Graph snapshot for context (essential for deleted triples)
1753 custom_filter (Filter): Custom filter instance for formatting
1754 subject_entity_key: Tuple of (class, shape) for the subject entity
1756 Returns:
1757 str: A human-readable label for the object value
1758 """
1759 predicate = str(predicate)
1761 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
1762 return custom_filter.human_readable_class(subject_entity_key)
1764 if validators.url(object_value):
1765 if object_shape_uri or object_class:
1766 return custom_filter.human_readable_entity(
1767 object_value, (object_class, object_shape_uri), snapshot
1768 )
1769 else:
1770 return str(object_value)
1772 return str(object_value)
1775def process_modification_data(data: dict) -> Tuple[str, List[dict]]:
1776 """
1777 Process modification data to extract subjects and predicates.
1779 Args:
1780 data: Dictionary containing modification data
1782 Returns:
1783 Tuple containing subject URI and list of modification details
1784 """
1785 subject_uri = data.get("subject")
1786 if not subject_uri:
1787 raise ValueError("No subject URI provided in modification data")
1789 modifications = data.get("modifications", [])
1790 if not modifications:
1791 raise ValueError("No modifications provided in data")
1793 return subject_uri, modifications
1796def validate_modification(
1797 modification: dict, subject_uri: str
1798) -> Tuple[bool, str]:
1799 """
1800 Validate a single modification operation.
1802 Args:
1803 modification: Dictionary containing modification details
1804 subject_uri: URI of the subject being modified
1806 Returns:
1807 Tuple of (is_valid, error_message)
1808 """
1809 form_fields = get_form_fields()
1810 operation = modification.get("operation")
1811 if not operation:
1812 return False, "No operation specified in modification"
1814 predicate = modification.get("predicate")
1815 if not predicate:
1816 return False, "No predicate specified in modification"
1818 if operation not in ["add", "remove", "update"]:
1819 return False, f"Invalid operation: {operation}"
1821 if form_fields:
1822 entity_type = modification.get("entity_type")
1823 entity_shape = modification.get("entity_shape")
1825 # If entity_type is not provided in modification, get it from the database
1826 if not entity_type:
1827 entity_types = get_entity_types(subject_uri)
1828 if entity_types:
1829 entity_type = get_highest_priority_class(entity_types)
1831 matching_key = find_matching_form_field(entity_type, entity_shape, form_fields)
1833 if matching_key:
1834 predicate_fields = form_fields[matching_key].get(predicate, [])
1836 for field in predicate_fields:
1837 if operation == "remove" and field.get("minCount", 0) > 0:
1838 return False, f"Cannot remove required predicate: {predicate}"
1840 if operation == "add":
1841 current_count = get_predicate_count(subject_uri, predicate)
1842 max_count = field.get("maxCount")
1844 if max_count and current_count >= max_count:
1845 return (
1846 False,
1847 f"Maximum count exceeded for predicate: {predicate}",
1848 )
1850 return True, ""
1853def get_predicate_count(subject_uri: str, predicate: str) -> int:
1854 """
1855 Get the current count of values for a predicate.
1857 Args:
1858 subject_uri: URI of the entity
1859 predicate: Predicate URI to count
1861 Returns:
1862 Number of values for the predicate
1863 """
1864 sparql = get_sparql()
1866 query = f"""
1867 SELECT (COUNT(?o) as ?count) WHERE {{
1868 <{subject_uri}> <{predicate}> ?o .
1869 }}
1870 """
1872 sparql.setQuery(query)
1873 sparql.setReturnFormat(JSON)
1874 results = sparql.query().convert()
1876 return int(results["results"]["bindings"][0]["count"]["value"])
1879def apply_modifications(
1880 editor: Editor,
1881 modifications: List[dict],
1882 subject_uri: str,
1883 graph_uri: Optional[str] = None,
1884):
1885 """
1886 Apply a list of modifications to an entity.
1888 Args:
1889 editor: Editor instance to use for modifications
1890 modifications: List of modification operations
1891 subject_uri: URI of the entity being modified
1892 graph_uri: Optional graph URI for quad store
1893 """
1894 for mod in modifications:
1895 operation = mod["operation"]
1896 predicate = mod["predicate"]
1898 if operation == "remove":
1899 editor.delete(URIRef(subject_uri), URIRef(predicate), graph_uri=graph_uri)
1901 elif operation == "add":
1902 value = mod["value"]
1903 datatype = mod.get("datatype", XSD.string)
1905 if validators.url(value):
1906 object_value = URIRef(value)
1907 else:
1908 object_value = Literal(value, datatype=URIRef(datatype))
1910 editor.create(
1911 URIRef(subject_uri), URIRef(predicate), object_value, graph_uri
1912 )
1914 elif operation == "update":
1915 old_value = mod["oldValue"]
1916 new_value = mod["newValue"]
1917 datatype = mod.get("datatype", XSD.string)
1919 if validators.url(old_value):
1920 old_object = URIRef(old_value)
1921 else:
1922 old_object = Literal(old_value, datatype=URIRef(datatype))
1924 if validators.url(new_value):
1925 new_object = URIRef(new_value)
1926 else:
1927 new_object = Literal(new_value, datatype=URIRef(datatype))
1929 editor.update(
1930 URIRef(subject_uri),
1931 URIRef(predicate),
1932 old_object,
1933 new_object,
1934 graph_uri,
1935 )