Coverage for heritrace/routes/entity.py: 89%
684 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
1import json
2from datetime import datetime
3from typing import Dict, List, Optional, Tuple
4import re
6import validators
7from flask import (Blueprint, abort, current_app, flash, jsonify, redirect,
8 render_template, request, url_for)
9from flask_babel import gettext
10from flask_login import current_user, login_required
11from heritrace.editor import Editor
12from heritrace.extensions import (get_change_tracking_config,
13 get_custom_filter, get_dataset_endpoint,
14 get_dataset_is_quadstore, get_display_rules,
15 get_form_fields, get_provenance_endpoint,
16 get_provenance_sparql, get_shacl_graph,
17 get_sparql)
18from heritrace.forms import *
19from heritrace.utils.converters import convert_to_datetime
20from heritrace.utils.display_rules_utils import (get_class_priority,
21 get_grouped_triples,
22 get_highest_priority_class,
23 get_property_order_from_rules,
24 is_entity_type_visible)
25from heritrace.utils.filters import Filter
26from heritrace.utils.shacl_utils import get_valid_predicates
27from heritrace.utils.sparql_utils import (
28 fetch_current_state_with_related_entities, fetch_data_graph_for_subject,
29 parse_sparql_update, get_entity_types
30)
31from heritrace.utils.uri_utils import generate_unique_uri
32from heritrace.utils.virtuoso_utils import (VIRTUOSO_EXCLUDED_GRAPHS,
33 is_virtuoso)
34from rdflib import RDF, XSD, ConjunctiveGraph, Graph, Literal, URIRef
35from resources.datatypes import DATATYPE_MAPPING
36from SPARQLWrapper import JSON
37from time_agnostic_library.agnostic_entity import AgnosticEntity
39entity_bp = Blueprint("entity", __name__)
42@entity_bp.route("/about/<path:subject>")
43@login_required
44def about(subject):
45 """
46 Display detailed information about an entity.
48 Args:
49 subject: URI of the entity to display
50 """
51 # Get necessary services and configurations
52 change_tracking_config = get_change_tracking_config()
54 # Initialize agnostic entity and get its history
55 agnostic_entity = AgnosticEntity(
56 res=subject, config=change_tracking_config, related_entities_history=True
57 )
58 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
60 is_deleted = False
61 context_snapshot = None
62 subject_classes = []
64 # Process entity history
65 if history.get(subject):
66 sorted_timestamps = sorted(history[subject].keys())
67 latest_snapshot = history[subject][sorted_timestamps[-1]]
68 latest_metadata = next(
69 (
70 meta
71 for _, meta in provenance[subject].items()
72 if meta["generatedAtTime"] == sorted_timestamps[-1]
73 ),
74 None,
75 )
77 is_deleted = (
78 latest_metadata
79 and "invalidatedAtTime" in latest_metadata
80 and latest_metadata["invalidatedAtTime"]
81 )
83 if is_deleted and len(sorted_timestamps) > 1:
84 context_snapshot = history[subject][sorted_timestamps[-2]]
85 subject_classes = [
86 o
87 for _, _, o in context_snapshot.triples(
88 (URIRef(subject), RDF.type, None)
89 )
90 ]
91 else:
92 context_snapshot = None
94 grouped_triples = {}
95 can_be_added = []
96 can_be_deleted = []
97 datatypes = {}
98 mandatory_values = {}
99 optional_values = {}
100 valid_predicates = []
101 entity_type = None
102 data_graph = None
103 linked_resources = []
104 inverse_references = []
106 if not is_deleted:
107 # Fetch current entity state
108 data_graph = fetch_data_graph_for_subject(subject)
109 if data_graph:
110 triples = list(data_graph.triples((None, None, None)))
111 # Get valid predicates and other metadata
112 (
113 can_be_added,
114 can_be_deleted,
115 datatypes,
116 mandatory_values,
117 optional_values,
118 subject_classes,
119 valid_predicates,
120 ) = get_valid_predicates(triples)
122 # Group triples for display
123 grouped_triples, relevant_properties = get_grouped_triples(
124 subject, triples, subject_classes, valid_predicates
125 )
127 can_be_added = [uri for uri in can_be_added if uri in relevant_properties]
128 can_be_deleted = [
129 uri for uri in can_be_deleted if uri in relevant_properties
130 ]
132 # Get resources that this entity links to (outgoing links)
133 linked_resources = set()
134 for _, predicate, obj in data_graph.triples((URIRef(subject), None, None)):
135 if isinstance(obj, URIRef) and str(obj) != str(subject) and predicate != RDF.type:
136 linked_resources.add(str(obj))
138 # Get inverse references only for non-deleted entities
139 inverse_references = get_inverse_references(subject)
141 # Add inverse references to linked resources
142 for ref in inverse_references:
143 linked_resources.add(ref["subject"])
145 # Convert to list
146 linked_resources = list(linked_resources)
148 else:
149 # For deleted entities, we don't need to get any linked resources
150 linked_resources = []
152 update_form = UpdateTripleForm()
153 create_form = (
154 CreateTripleFormWithSelect() if can_be_added else CreateTripleFormWithInput()
155 )
156 if can_be_added:
157 create_form.predicate.choices = [
158 (p, get_custom_filter().human_readable_predicate(p, subject_classes))
159 for p in can_be_added
160 ]
162 form_fields = get_form_fields()
163 entity_types = list(form_fields.keys())
165 predicate_details_map = {}
166 for entity_type_key, predicates in form_fields.items():
167 for predicate_uri, details_list in predicates.items():
168 for details in details_list:
169 shape = details.get("nodeShape")
170 key = (predicate_uri, entity_type_key, shape)
171 predicate_details_map[key] = details
173 # Ensure entity_type is set correctly using the potentially updated subject_classes
174 entity_type = str(get_highest_priority_class(subject_classes)) if subject_classes else None
176 return render_template(
177 "entity/about.jinja",
178 subject=subject,
179 history=history,
180 can_be_added=can_be_added,
181 can_be_deleted=can_be_deleted,
182 datatypes=datatypes,
183 update_form=update_form,
184 create_form=create_form,
185 mandatory_values=mandatory_values,
186 optional_values=optional_values,
187 shacl=bool(len(get_shacl_graph())),
188 grouped_triples=grouped_triples,
189 subject_classes=[str(s_class) for s_class in subject_classes],
190 display_rules=get_display_rules(),
191 form_fields=form_fields,
192 entity_types=entity_types,
193 entity_type=entity_type,
194 predicate_details_map=predicate_details_map,
195 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"],
196 dataset_db_text_index_enabled=current_app.config[
197 "DATASET_DB_TEXT_INDEX_ENABLED"
198 ],
199 inverse_references=inverse_references,
200 is_deleted=is_deleted,
201 context=context_snapshot,
202 linked_resources=linked_resources,
203 )
206@entity_bp.route("/create-entity", methods=["GET", "POST"])
207@login_required
208def create_entity():
209 form_fields = get_form_fields()
211 entity_types = sorted(
212 [
213 entity_type
214 for entity_type in form_fields.keys()
215 if is_entity_type_visible(entity_type)
216 ],
217 key=lambda et: get_class_priority(et),
218 reverse=True,
219 )
221 datatype_options = {
222 gettext("Text (string)"): XSD.string,
223 gettext("Whole number (integer)"): XSD.integer,
224 gettext("True or False (boolean)"): XSD.boolean,
225 gettext("Date (YYYY-MM-DD)"): XSD.date,
226 gettext("Date and Time (YYYY-MM-DDThh:mm:ss)"): XSD.dateTime,
227 gettext("Decimal number"): XSD.decimal,
228 gettext("Floating point number"): XSD.float,
229 gettext("Double precision floating point number"): XSD.double,
230 gettext("Time (hh:mm:ss)"): XSD.time,
231 gettext("Year (YYYY)"): XSD.gYear,
232 gettext("Month (MM)"): XSD.gMonth,
233 gettext("Day of the month (DD)"): XSD.gDay,
234 gettext("Duration (e.g., P1Y2M3DT4H5M6S)"): XSD.duration,
235 gettext("Hexadecimal binary"): XSD.hexBinary,
236 gettext("Base64 encoded binary"): XSD.base64Binary,
237 gettext("Web address (URL)"): XSD.anyURI,
238 gettext("Language code (e.g., en, it)"): XSD.language,
239 gettext("Normalized text (no line breaks)"): XSD.normalizedString,
240 gettext("Tokenized text (single word)"): XSD.token,
241 gettext("Non-positive integer (0 or negative)"): XSD.nonPositiveInteger,
242 gettext("Negative integer"): XSD.negativeInteger,
243 gettext("Long integer"): XSD.long,
244 gettext("Short integer"): XSD.short,
245 gettext("Byte-sized integer"): XSD.byte,
246 gettext("Non-negative integer (0 or positive)"): XSD.nonNegativeInteger,
247 gettext("Positive integer (greater than 0)"): XSD.positiveInteger,
248 gettext("Unsigned long integer"): XSD.unsignedLong,
249 gettext("Unsigned integer"): XSD.unsignedInt,
250 gettext("Unsigned short integer"): XSD.unsignedShort,
251 gettext("Unsigned byte"): XSD.unsignedByte,
252 }
254 if request.method == "POST":
255 structured_data = json.loads(request.form.get("structured_data", "{}"))
257 editor = Editor(
258 get_dataset_endpoint(),
259 get_provenance_endpoint(),
260 current_app.config["COUNTER_HANDLER"],
261 URIRef(f"https://orcid.org/{current_user.orcid}"),
262 current_app.config["PRIMARY_SOURCE"],
263 current_app.config["DATASET_GENERATION_TIME"],
264 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"],
265 )
267 if form_fields:
268 validation_errors = validate_entity_data(structured_data, form_fields)
269 if validation_errors:
270 return jsonify({"status": "error", "errors": validation_errors}), 400
272 entity_type = structured_data.get("entity_type")
273 properties = structured_data.get("properties", {})
275 entity_uri = generate_unique_uri(entity_type)
276 editor.preexisting_finished()
278 default_graph_uri = (
279 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None
280 )
282 for predicate, values in properties.items():
283 if not isinstance(values, list):
284 values = [values]
286 field_definitions = form_fields.get(entity_type, {}).get(predicate, [])
288 # Get the shape from the property value if available
289 property_shape = None
290 if values and isinstance(values[0], dict):
291 property_shape = values[0].get("shape")
293 # Filter field definitions to find the matching one based on shape
294 matching_field_def = None
295 for field_def in field_definitions:
296 if property_shape:
297 # If property has a shape, match it with the field definition's subjectShape
298 if field_def.get("subjectShape") == property_shape:
299 matching_field_def = field_def
300 break
301 else:
302 # If no shape specified, use the first field definition without a shape requirement
303 if not field_def.get("subjectShape"):
304 matching_field_def = field_def
305 break
307 # If no matching field definition found, use the first one (default behavior)
308 if not matching_field_def and field_definitions:
309 matching_field_def = field_definitions[0]
311 ordered_by = (
312 matching_field_def.get("orderedBy") if matching_field_def else None
313 )
315 if ordered_by:
316 # Gestisci le proprietà ordinate per shape
317 values_by_shape = {}
318 for value in values:
319 # Ottieni la shape dell'entità
320 shape = value.get("shape")
321 if not shape:
322 shape = "default_shape"
323 if shape not in values_by_shape:
324 values_by_shape[shape] = []
325 values_by_shape[shape].append(value)
327 # Ora processa ogni gruppo di valori per shape separatamente
328 for shape, shape_values in values_by_shape.items():
329 previous_entity = None
330 for value in shape_values:
331 if isinstance(value, dict) and "entity_type" in value:
332 nested_uri = generate_unique_uri(value["entity_type"])
333 editor.create(
334 entity_uri,
335 URIRef(predicate),
336 nested_uri,
337 default_graph_uri,
338 )
339 create_nested_entity(
340 editor,
341 nested_uri,
342 value,
343 default_graph_uri,
344 form_fields,
345 )
346 else:
347 # If it's a direct URI value (reference to existing entity)
348 nested_uri = URIRef(value)
349 editor.create(
350 entity_uri,
351 URIRef(predicate),
352 nested_uri,
353 default_graph_uri,
354 )
356 if previous_entity:
357 editor.create(
358 previous_entity,
359 URIRef(ordered_by),
360 nested_uri,
361 default_graph_uri,
362 )
363 previous_entity = nested_uri
364 else:
365 # Gestisci le proprietà non ordinate
366 for value in values:
367 if isinstance(value, dict) and "entity_type" in value:
368 nested_uri = generate_unique_uri(value["entity_type"])
369 editor.create(
370 entity_uri,
371 URIRef(predicate),
372 nested_uri,
373 default_graph_uri,
374 )
375 create_nested_entity(
376 editor,
377 nested_uri,
378 value,
379 default_graph_uri,
380 form_fields,
381 )
382 else:
383 # Handle both URI references and literal values
384 if validators.url(str(value)):
385 object_value = URIRef(value)
386 else:
387 datatype_uris = []
388 if matching_field_def:
389 datatype_uris = matching_field_def.get(
390 "datatypes", []
391 )
392 datatype = determine_datatype(value, datatype_uris)
393 object_value = Literal(value, datatype=datatype)
394 editor.create(
395 entity_uri,
396 URIRef(predicate),
397 object_value,
398 default_graph_uri,
399 )
400 else:
401 properties = structured_data.get("properties", {})
403 entity_uri = generate_unique_uri()
404 editor.import_entity(entity_uri)
405 editor.preexisting_finished()
407 default_graph_uri = (
408 URIRef(f"{entity_uri}/graph") if editor.dataset_is_quadstore else None
409 )
411 for predicate, values in properties.items():
412 if not isinstance(values, list):
413 values = [values]
414 for value_dict in values:
415 if value_dict["type"] == "uri":
416 editor.create(
417 entity_uri,
418 URIRef(predicate),
419 URIRef(value_dict["value"]),
420 default_graph_uri,
421 )
422 elif value_dict["type"] == "literal":
423 datatype = (
424 URIRef(value_dict["datatype"])
425 if "datatype" in value_dict
426 else XSD.string
427 )
428 editor.create(
429 entity_uri,
430 URIRef(predicate),
431 Literal(value_dict["value"], datatype=datatype),
432 default_graph_uri,
433 )
435 try:
436 editor.save()
437 response = jsonify(
438 {
439 "status": "success",
440 "redirect_url": url_for("entity.about", subject=str(entity_uri)),
441 }
442 )
443 flash(gettext("Entity created successfully"), "success")
444 return response, 200
445 except Exception as e:
446 error_message = gettext(
447 "An error occurred while creating the entity: %(error)s", error=str(e)
448 )
449 return jsonify({"status": "error", "errors": [error_message]}), 500
451 return render_template(
452 "create_entity.jinja",
453 shacl=bool(get_form_fields()),
454 entity_types=entity_types,
455 form_fields=form_fields,
456 datatype_options=datatype_options,
457 dataset_db_triplestore=current_app.config["DATASET_DB_TRIPLESTORE"],
458 dataset_db_text_index_enabled=current_app.config[
459 "DATASET_DB_TEXT_INDEX_ENABLED"
460 ],
461 )
464def create_nested_entity(
465 editor: Editor, entity_uri, entity_data, graph_uri=None, form_fields=None
466):
467 # Add rdf:type
468 editor.create(
469 entity_uri,
470 URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
471 URIRef(entity_data["entity_type"]),
472 graph_uri,
473 )
475 entity_type = entity_data.get("entity_type")
476 properties = entity_data.get("properties", {})
478 # Add other properties
479 for predicate, values in properties.items():
480 if not isinstance(values, list):
481 values = [values]
482 field_definitions = form_fields.get(entity_type, {}).get(predicate, [])
483 for value in values:
484 if isinstance(value, dict) and "entity_type" in value:
485 if "intermediateRelation" in value:
486 intermediate_uri = generate_unique_uri(
487 value["intermediateRelation"]["class"]
488 )
489 target_uri = generate_unique_uri(value["entity_type"])
490 editor.create(
491 entity_uri, URIRef(predicate), intermediate_uri, graph_uri
492 )
493 editor.create(
494 intermediate_uri,
495 URIRef(value["intermediateRelation"]["property"]),
496 target_uri,
497 graph_uri,
498 )
499 create_nested_entity(
500 editor, target_uri, value, graph_uri, form_fields
501 )
502 else:
503 # Handle nested entities
504 nested_uri = generate_unique_uri(value["entity_type"])
505 editor.create(entity_uri, URIRef(predicate), nested_uri, graph_uri)
506 create_nested_entity(
507 editor, nested_uri, value, graph_uri, form_fields
508 )
509 else:
510 # Handle simple properties
511 datatype = XSD.string # Default to string if not specified
512 datatype_uris = []
513 if field_definitions:
514 datatype_uris = field_definitions[0].get("datatypes", [])
515 datatype = determine_datatype(value, datatype_uris)
516 object_value = (
517 URIRef(value)
518 if validators.url(value)
519 else Literal(value, datatype=datatype)
520 )
521 editor.create(entity_uri, URIRef(predicate), object_value, graph_uri)
524def determine_datatype(value, datatype_uris):
525 for datatype_uri in datatype_uris:
526 validation_func = next(
527 (d[1] for d in DATATYPE_MAPPING if str(d[0]) == str(datatype_uri)), None
528 )
529 if validation_func and validation_func(value):
530 return URIRef(datatype_uri)
531 # If none match, default to XSD.string
532 return XSD.string
535def validate_entity_data(structured_data, form_fields):
536 """
537 Validates entity data against form field definitions, considering shape matching.
539 Args:
540 structured_data (dict): Data to validate containing entity_type and properties
541 form_fields (dict): Form field definitions from SHACL shapes
543 Returns:
544 list: List of validation error messages, empty if validation passes
545 """
546 custom_filter = get_custom_filter()
548 errors = []
549 entity_type = structured_data.get("entity_type")
550 if not entity_type:
551 errors.append(gettext("Entity type is required"))
552 elif entity_type not in form_fields:
553 errors.append(
554 gettext(
555 "Invalid entity type selected: %(entity_type)s",
556 entity_type=entity_type,
557 )
558 )
560 if errors:
561 return errors
563 entity_fields = form_fields.get(entity_type, {})
564 properties = structured_data.get("properties", {})
566 for prop_uri, prop_values in properties.items():
567 if URIRef(prop_uri) == RDF.type:
568 continue
570 field_definitions = entity_fields.get(prop_uri)
571 if not field_definitions:
572 errors.append(
573 gettext(
574 "Unknown property %(prop_uri)s for entity type %(entity_type)s",
575 prop_uri=prop_uri,
576 entity_type=entity_type,
577 )
578 )
579 continue
581 if not isinstance(prop_values, list):
582 prop_values = [prop_values]
584 # Get the shape from the property value if available
585 property_shape = None
586 if prop_values and isinstance(prop_values[0], dict):
587 property_shape = prop_values[0].get("shape")
589 # Filter field definitions to find the matching one based on shape
590 matching_field_def = None
591 for field_def in field_definitions:
592 if property_shape:
593 # If property has a shape, match it with the field definition's subjectShape
594 if field_def.get("subjectShape") == property_shape:
595 matching_field_def = field_def
596 break
597 else:
598 # If no shape specified, use the first field definition without a shape requirement
599 if not field_def.get("subjectShape"):
600 matching_field_def = field_def
601 break
603 # If no matching field definition found, use the first one (default behavior)
604 if not matching_field_def and field_definitions:
605 matching_field_def = field_definitions[0]
607 if matching_field_def:
608 # Validate cardinality
609 min_count = matching_field_def.get("min", 0)
610 max_count = matching_field_def.get("max", None)
611 value_count = len(prop_values)
613 if value_count < min_count:
614 value = gettext("values") if min_count > 1 else gettext("value")
615 errors.append(
616 gettext(
617 "Property %(prop_uri)s requires at least %(min_count)d %(value)s",
618 prop_uri=custom_filter.human_readable_predicate(
619 prop_uri, [entity_type]
620 ),
621 min_count=min_count,
622 value=value,
623 )
624 )
625 if max_count is not None and value_count > max_count:
626 value = gettext("values") if max_count > 1 else gettext("value")
627 errors.append(
628 gettext(
629 "Property %(prop_uri)s allows at most %(max_count)d %(value)s",
630 prop_uri=custom_filter.human_readable_predicate(
631 prop_uri, [entity_type]
632 ),
633 max_count=max_count,
634 value=value,
635 )
636 )
638 # Validate mandatory values
639 mandatory_values = matching_field_def.get("mandatory_values", [])
640 for mandatory_value in mandatory_values:
641 if mandatory_value not in prop_values:
642 errors.append(
643 gettext(
644 "Property %(prop_uri)s requires the value %(mandatory_value)s",
645 prop_uri=custom_filter.human_readable_predicate(
646 prop_uri, [entity_type]
647 ),
648 mandatory_value=mandatory_value,
649 )
650 )
652 # Validate each value
653 for value in prop_values:
654 if isinstance(value, dict) and "entity_type" in value:
655 nested_errors = validate_entity_data(value, form_fields)
656 errors.extend(nested_errors)
657 else:
658 # Validate against datatypes
659 datatypes = matching_field_def.get("datatypes", [])
660 if datatypes:
661 is_valid_datatype = False
662 for dtype in datatypes:
663 validation_func = next(
664 (
665 d[1]
666 for d in DATATYPE_MAPPING
667 if d[0] == URIRef(dtype)
668 ),
669 None,
670 )
671 if validation_func and validation_func(value):
672 is_valid_datatype = True
673 break
674 if not is_valid_datatype:
675 expected_types = ", ".join(
676 [
677 custom_filter.human_readable_predicate(
678 dtype, form_fields.keys()
679 )
680 for dtype in datatypes
681 ]
682 )
683 errors.append(
684 gettext(
685 'Value "%(value)s" for property %(prop_uri)s is not of expected type %(expected_types)s',
686 value=value,
687 prop_uri=custom_filter.human_readable_predicate(
688 prop_uri, form_fields.keys()
689 ),
690 expected_types=expected_types,
691 )
692 )
694 # Validate against optional values
695 optional_values = matching_field_def.get("optionalValues", [])
696 if optional_values and value not in optional_values:
697 acceptable_values = ", ".join(
698 [
699 custom_filter.human_readable_predicate(
700 val, form_fields.keys()
701 )
702 for val in optional_values
703 ]
704 )
705 errors.append(
706 gettext(
707 'Value "%(value)s" is not permitted for property %(prop_uri)s. Acceptable values are: %(acceptable_values)s',
708 value=value,
709 prop_uri=custom_filter.human_readable_predicate(
710 prop_uri, form_fields.keys()
711 ),
712 acceptable_values=acceptable_values,
713 )
714 )
716 # In the RDF model, a property with zero values is equivalent to the property being absent,
717 # as a triple requires a subject, predicate, and object. Therefore, this section checks for
718 # properties defined in the schema that are completely absent from the input data but are
719 # required (min_count > 0). This complements the cardinality check above, which only
720 # validates properties that are present in the data.
721 # Check for missing required properties
722 for prop_uri, field_definitions in entity_fields.items():
723 if prop_uri not in properties:
724 for field_def in field_definitions:
725 min_count = field_def.get("min", 0)
726 if min_count > 0:
727 value = gettext("values") if min_count > 1 else gettext("value")
728 errors.append(
729 gettext(
730 "Missing required property: %(prop_uri)s requires at least %(min_count)d %(value)s",
731 prop_uri=custom_filter.human_readable_predicate(
732 prop_uri, [entity_type]
733 ),
734 min_count=min_count,
735 value=value,
736 )
737 )
738 break # Only need to report once per property
740 return errors
743@entity_bp.route("/entity-history/<path:entity_uri>")
744@login_required
745def entity_history(entity_uri):
746 """
747 Display the history of changes for an entity.
749 Args:
750 entity_uri: URI of the entity
751 """
752 custom_filter = get_custom_filter()
753 change_tracking_config = get_change_tracking_config()
755 agnostic_entity = AgnosticEntity(
756 res=entity_uri, config=change_tracking_config, related_entities_history=True
757 )
758 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
760 sorted_metadata = sorted(
761 provenance[entity_uri].items(),
762 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]),
763 )
764 sorted_timestamps = [
765 convert_to_datetime(meta["generatedAtTime"], stringify=True)
766 for _, meta in sorted_metadata
767 ]
769 # Get correct context for entity label
770 latest_metadata = sorted_metadata[-1][1] if sorted_metadata else None
771 is_latest_deletion = (
772 latest_metadata
773 and "invalidatedAtTime" in latest_metadata
774 and latest_metadata["invalidatedAtTime"]
775 )
776 if is_latest_deletion and len(sorted_timestamps) > 1:
777 context_snapshot = history[entity_uri][sorted_timestamps[-2]]
778 else:
779 context_snapshot = history[entity_uri][sorted_timestamps[-1]]
781 entity_classes = set()
782 classes = list(context_snapshot.triples((URIRef(entity_uri), RDF.type, None)))
783 for triple in classes:
784 entity_classes.add(str(triple[2]))
785 highest_priority_class = get_highest_priority_class(entity_classes)
786 entity_classes_for_label = [highest_priority_class] if highest_priority_class else []
788 # Generate timeline events
789 events = []
790 for i, (snapshot_uri, metadata) in enumerate(sorted_metadata):
791 date = convert_to_datetime(metadata["generatedAtTime"])
792 snapshot_timestamp_str = convert_to_datetime(
793 metadata["generatedAtTime"], stringify=True
794 )
795 snapshot_graph = history[entity_uri][snapshot_timestamp_str]
797 responsible_agent = custom_filter.format_agent_reference(
798 metadata["wasAttributedTo"]
799 )
800 primary_source = custom_filter.format_source_reference(
801 metadata["hadPrimarySource"]
802 )
804 description = _format_snapshot_description(
805 metadata,
806 entity_uri,
807 entity_classes_for_label,
808 context_snapshot,
809 history,
810 sorted_timestamps,
811 i,
812 custom_filter,
813 )
814 modifications = metadata.get("hasUpdateQuery", "")
815 modification_text = ""
816 if modifications:
817 parsed_modifications = parse_sparql_update(modifications)
818 modification_text = generate_modification_text(
819 parsed_modifications,
820 list(entity_classes),
821 history=history,
822 entity_uri=entity_uri,
823 current_snapshot=snapshot_graph,
824 current_snapshot_timestamp=snapshot_timestamp_str,
825 custom_filter=custom_filter,
826 form_fields=get_form_fields(),
827 )
829 event = {
830 "start_date": {
831 "year": date.year,
832 "month": date.month,
833 "day": date.day,
834 "hour": date.hour,
835 "minute": date.minute,
836 "second": date.second,
837 },
838 "text": {
839 "headline": gettext("Snapshot") + " " + str(i + 1),
840 "text": f"""
841 <p><strong>{gettext('Responsible agent')}:</strong> {responsible_agent}</p>
842 <p><strong>{gettext('Primary source')}:</strong> {primary_source}</p>
843 <p><strong>{gettext('Description')}:</strong> {description}</p>
844 <div class="modifications mb-3">
845 {modification_text}
846 </div>
847 <a href='/entity-version/{entity_uri}/{metadata["generatedAtTime"]}' class='btn btn-outline-primary mt-2 view-version' target='_self'>{gettext('View version')}</a>
848 """,
849 },
850 "autolink": False,
851 }
853 if i + 1 < len(sorted_metadata):
854 next_date = convert_to_datetime(
855 sorted_metadata[i + 1][1]["generatedAtTime"]
856 )
857 event["end_date"] = {
858 "year": next_date.year,
859 "month": next_date.month,
860 "day": next_date.day,
861 "hour": next_date.hour,
862 "minute": next_date.minute,
863 "second": next_date.second,
864 }
866 events.append(event)
868 entity_label = custom_filter.human_readable_entity(
869 entity_uri, entity_classes, context_snapshot
870 )
872 timeline_data = {
873 "entityUri": entity_uri,
874 "entityLabel": entity_label,
875 "entityClasses": list(entity_classes),
876 "events": events,
877 }
879 return render_template("entity/history.jinja", timeline_data=timeline_data)
882def _format_snapshot_description(
883 metadata: dict,
884 entity_uri: str,
885 entity_classes: list[str],
886 context_snapshot: Graph,
887 history: dict,
888 sorted_timestamps: list[str],
889 current_index: int,
890 custom_filter: Filter,
891) -> Tuple[str, bool]:
892 """
893 Formats the snapshot description and determines if it's a merge snapshot.
895 Args:
896 metadata: The snapshot metadata dictionary.
897 entity_uri: The URI of the main entity.
898 entity_classes: The classes of the main entity.
899 context_snapshot: The graph snapshot for context.
900 history: The history dictionary containing snapshots.
901 sorted_timestamps: Sorted list of snapshot timestamps.
902 current_index: The index of the current snapshot in sorted_timestamps.
903 custom_filter: The custom filter instance for formatting.
905 Returns:
906 The formatted description string.
907 """
908 description = metadata.get("description", "")
909 is_merge_snapshot = False
910 was_derived_from = metadata.get('wasDerivedFrom')
911 if isinstance(was_derived_from, list) and len(was_derived_from) > 1:
912 is_merge_snapshot = True
914 if is_merge_snapshot:
915 # Regex to find URI after "merged with", potentially enclosed in single quotes or none
916 match = re.search(r"merged with ['‘]?([^'’<>\s]+)['’]?", description)
917 if match:
918 potential_merged_uri = match.group(1)
919 if validators.url(potential_merged_uri):
920 merged_entity_uri_from_desc = potential_merged_uri
921 merged_entity_label = None
922 if current_index > 0:
923 previous_snapshot_timestamp = sorted_timestamps[current_index - 1]
924 previous_snapshot_graph = history.get(entity_uri, {}).get(previous_snapshot_timestamp)
925 if previous_snapshot_graph:
926 raw_merged_entity_classes = [
927 str(o)
928 for s, p, o in previous_snapshot_graph.triples(
929 (URIRef(merged_entity_uri_from_desc), RDF.type, None)
930 )
931 ]
932 highest_priority_merged_class = get_highest_priority_class(
933 raw_merged_entity_classes
934 ) if raw_merged_entity_classes else None
935 merged_entity_classes_for_label = (
936 [highest_priority_merged_class]
937 if highest_priority_merged_class
938 else []
939 )
940 merged_entity_label = custom_filter.human_readable_entity(
941 merged_entity_uri_from_desc,
942 merged_entity_classes_for_label,
943 previous_snapshot_graph,
944 )
945 if (
946 merged_entity_label
947 and merged_entity_label != merged_entity_uri_from_desc
948 ):
949 description = description.replace(
950 match.group(0), f"merged with '{merged_entity_label}'"
951 )
953 entity_label_for_desc = custom_filter.human_readable_entity(
954 entity_uri, entity_classes, context_snapshot
955 )
956 if entity_label_for_desc and entity_label_for_desc != entity_uri:
957 description = description.replace(f"'{entity_uri}'", f"'{entity_label_for_desc}'")
959 return description
962@entity_bp.route("/entity-version/<path:entity_uri>/<timestamp>")
963@login_required
964def entity_version(entity_uri, timestamp):
965 """
966 Display a specific version of an entity.
968 Args:
969 entity_uri: URI of the entity
970 timestamp: Timestamp of the version to display
971 """
972 custom_filter = get_custom_filter()
973 form_fields = get_form_fields()
974 change_tracking_config = get_change_tracking_config()
976 try:
977 timestamp_dt = datetime.fromisoformat(timestamp)
978 except ValueError:
979 # Try to get timestamp from provenance graph
980 provenance_sparql = get_provenance_sparql()
981 query_timestamp = f"""
982 SELECT ?generation_time
983 WHERE {{
984 <{entity_uri}/prov/se/{timestamp}> <http://www.w3.org/ns/prov#generatedAtTime> ?generation_time.
985 }}
986 """
987 provenance_sparql.setQuery(query_timestamp)
988 provenance_sparql.setReturnFormat(JSON)
989 try:
990 generation_time = provenance_sparql.queryAndConvert()["results"][
991 "bindings"
992 ][0]["generation_time"]["value"]
993 except IndexError:
994 abort(404)
995 timestamp = generation_time
996 timestamp_dt = datetime.fromisoformat(generation_time)
998 # Get entity history
999 agnostic_entity = AgnosticEntity(
1000 res=entity_uri, config=change_tracking_config, related_entities_history=True
1001 )
1002 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
1004 # Find closest snapshot
1005 main_entity_history = history.get(entity_uri, {})
1006 sorted_timestamps = sorted(
1007 main_entity_history.keys(), key=lambda t: convert_to_datetime(t)
1008 )
1010 if not sorted_timestamps:
1011 abort(404)
1013 closest_timestamp = min(
1014 sorted_timestamps,
1015 key=lambda t: abs(
1016 convert_to_datetime(t).astimezone() - timestamp_dt.astimezone()
1017 ),
1018 )
1020 version = main_entity_history[closest_timestamp]
1021 triples = list(version.triples((URIRef(entity_uri), None, None)))
1023 # Get metadata
1024 entity_metadata = provenance.get(entity_uri, {})
1025 closest_metadata = None
1026 min_time_diff = None
1028 latest_timestamp = max(sorted_timestamps)
1029 latest_metadata = None
1031 for se_uri, meta in entity_metadata.items():
1032 meta_time = convert_to_datetime(meta["generatedAtTime"])
1033 time_diff = abs((meta_time - timestamp_dt).total_seconds())
1035 if closest_metadata is None or time_diff < min_time_diff:
1036 closest_metadata = meta
1037 min_time_diff = time_diff
1039 if meta["generatedAtTime"] == latest_timestamp:
1040 latest_metadata = meta
1042 if closest_metadata is None or latest_metadata is None:
1043 abort(404)
1045 # Check if this is a deletion snapshot
1046 is_deletion_snapshot = (
1047 closest_timestamp == latest_timestamp
1048 and "invalidatedAtTime" in latest_metadata
1049 and latest_metadata["invalidatedAtTime"]
1050 ) or len(triples) == 0
1052 # Use appropriate snapshot for context
1053 context_version = version
1054 if is_deletion_snapshot and len(sorted_timestamps) > 1:
1055 current_index = sorted_timestamps.index(closest_timestamp)
1056 if current_index > 0:
1057 context_version = main_entity_history[sorted_timestamps[current_index - 1]]
1059 # Get subject classes
1060 if is_deletion_snapshot and len(sorted_timestamps) > 1:
1061 subject_classes = [
1062 o
1063 for _, _, o in context_version.triples((URIRef(entity_uri), RDF.type, None))
1064 ]
1065 else:
1066 subject_classes = [
1067 o for _, _, o in version.triples((URIRef(entity_uri), RDF.type, None))
1068 ]
1070 subject_classes = [get_highest_priority_class(subject_classes)]
1072 # Process and group triples
1073 _, _, _, _, _, _, valid_predicates = get_valid_predicates(triples)
1074 grouped_triples, relevant_properties = get_grouped_triples(
1075 entity_uri,
1076 triples,
1077 subject_classes,
1078 valid_predicates,
1079 historical_snapshot=context_version,
1080 )
1082 # Calculate version number
1083 snapshot_times = [
1084 convert_to_datetime(meta["generatedAtTime"])
1085 for meta in entity_metadata.values()
1086 ]
1087 snapshot_times = sorted(set(snapshot_times))
1088 version_number = snapshot_times.index(timestamp_dt) + 1
1090 # Find next and previous snapshots
1091 next_snapshot_timestamp = None
1092 prev_snapshot_timestamp = None
1094 for snap_time in snapshot_times:
1095 if snap_time > timestamp_dt:
1096 next_snapshot_timestamp = snap_time.isoformat()
1097 break
1099 for snap_time in reversed(snapshot_times):
1100 if snap_time < timestamp_dt:
1101 prev_snapshot_timestamp = snap_time.isoformat()
1102 break
1104 # Generate modification text if update query exists
1105 modifications = ""
1106 if closest_metadata.get("hasUpdateQuery"):
1107 sparql_query = closest_metadata["hasUpdateQuery"]
1108 parsed_modifications = parse_sparql_update(sparql_query)
1109 modifications = generate_modification_text(
1110 parsed_modifications,
1111 subject_classes,
1112 history=history,
1113 entity_uri=entity_uri,
1114 current_snapshot=version,
1115 current_snapshot_timestamp=closest_timestamp,
1116 custom_filter=custom_filter,
1117 form_fields=form_fields,
1118 )
1120 try:
1121 current_index = sorted_timestamps.index(closest_timestamp)
1122 except ValueError:
1123 current_index = -1
1125 if closest_metadata.get("description"):
1126 formatted_description = _format_snapshot_description(
1127 closest_metadata,
1128 entity_uri,
1129 subject_classes,
1130 context_version,
1131 history,
1132 sorted_timestamps,
1133 current_index,
1134 custom_filter,
1135 )
1136 closest_metadata["description"] = formatted_description
1138 closest_timestamp = closest_metadata["generatedAtTime"]
1140 return render_template(
1141 "entity/version.jinja",
1142 subject=entity_uri,
1143 metadata={closest_timestamp: closest_metadata},
1144 timestamp=closest_timestamp,
1145 next_snapshot_timestamp=next_snapshot_timestamp,
1146 prev_snapshot_timestamp=prev_snapshot_timestamp,
1147 modifications=modifications,
1148 grouped_triples=grouped_triples,
1149 subject_classes=subject_classes,
1150 version_number=version_number,
1151 version=context_version,
1152 )
1155@entity_bp.route("/restore-version/<path:entity_uri>/<timestamp>", methods=["POST"])
1156@login_required
1157def restore_version(entity_uri, timestamp):
1158 """
1159 Restore an entity to a previous version.
1161 Args:
1162 entity_uri: URI of the entity to restore
1163 timestamp: Timestamp of the version to restore to
1164 """
1165 timestamp = convert_to_datetime(timestamp, stringify=True)
1166 change_tracking_config = get_change_tracking_config()
1168 # Get entity history
1169 agnostic_entity = AgnosticEntity(
1170 res=entity_uri, config=change_tracking_config, related_entities_history=True
1171 )
1172 history, provenance = agnostic_entity.get_history(include_prov_metadata=True)
1174 historical_graph = history.get(entity_uri, {}).get(timestamp)
1175 if historical_graph is None:
1176 abort(404)
1178 current_graph = fetch_current_state_with_related_entities(provenance)
1180 is_deleted = len(list(current_graph.triples((URIRef(entity_uri), None, None)))) == 0
1182 triples_or_quads_to_delete, triples_or_quads_to_add = compute_graph_differences(
1183 current_graph, historical_graph
1184 )
1186 # Get all entities that need restoration
1187 entities_to_restore = get_entities_to_restore(
1188 triples_or_quads_to_delete, triples_or_quads_to_add, entity_uri
1189 )
1191 # Prepare snapshot information for all entities
1192 entity_snapshots = prepare_entity_snapshots(
1193 entities_to_restore, provenance, timestamp
1194 )
1196 # Create editor instance
1197 editor = Editor(
1198 get_dataset_endpoint(),
1199 get_provenance_endpoint(),
1200 current_app.config["COUNTER_HANDLER"],
1201 URIRef(f"https://orcid.org/{current_user.orcid}"),
1202 None if is_deleted else entity_snapshots[entity_uri]["source"],
1203 current_app.config["DATASET_GENERATION_TIME"],
1204 dataset_is_quadstore=current_app.config["DATASET_IS_QUADSTORE"],
1205 )
1207 # Import current state into editor
1208 if get_dataset_is_quadstore():
1209 for quad in current_graph.quads():
1210 editor.g_set.add(quad)
1211 else:
1212 for triple in current_graph:
1213 editor.g_set.add(triple)
1215 editor.preexisting_finished()
1217 # Apply deletions
1218 for item in triples_or_quads_to_delete:
1219 if len(item) == 4:
1220 editor.delete(item[0], item[1], item[2], item[3])
1221 else:
1222 editor.delete(item[0], item[1], item[2])
1224 subject = str(item[0])
1225 if subject in entity_snapshots:
1226 entity_info = entity_snapshots[subject]
1227 if entity_info["needs_restore"]:
1228 editor.g_set.mark_as_restored(URIRef(subject))
1229 editor.g_set.entity_index[URIRef(subject)]["restoration_source"] = (
1230 entity_info["source"]
1231 )
1233 # Apply additions
1234 for item in triples_or_quads_to_add:
1235 if len(item) == 4:
1236 editor.create(item[0], item[1], item[2], item[3])
1237 else:
1238 editor.create(item[0], item[1], item[2])
1240 subject = str(item[0])
1241 if subject in entity_snapshots:
1242 entity_info = entity_snapshots[subject]
1243 if entity_info["needs_restore"]:
1244 editor.g_set.mark_as_restored(URIRef(subject))
1245 editor.g_set.entity_index[URIRef(subject)]["source"] = entity_info[
1246 "source"
1247 ]
1249 # Handle main entity restoration if needed
1250 if is_deleted and entity_uri in entity_snapshots:
1251 editor.g_set.mark_as_restored(URIRef(entity_uri))
1252 source = entity_snapshots[entity_uri]["source"]
1253 editor.g_set.entity_index[URIRef(entity_uri)]["source"] = source
1255 try:
1256 editor.save()
1257 flash(gettext("Version restored successfully"), "success")
1258 except Exception as e:
1259 flash(
1260 gettext(
1261 "An error occurred while restoring the version: %(error)s", error=str(e)
1262 ),
1263 "error",
1264 )
1266 return redirect(url_for("entity.about", subject=entity_uri))
1269def compute_graph_differences(
1270 current_graph: Graph | ConjunctiveGraph, historical_graph: Graph | ConjunctiveGraph
1271):
1272 if get_dataset_is_quadstore():
1273 current_data = set(current_graph.quads())
1274 historical_data = set(historical_graph.quads())
1275 else:
1276 current_data = set(current_graph.triples((None, None, None)))
1277 historical_data = set(historical_graph.triples((None, None, None)))
1278 triples_or_quads_to_delete = current_data - historical_data
1279 triples_or_quads_to_add = historical_data - current_data
1281 return triples_or_quads_to_delete, triples_or_quads_to_add
1284def get_entities_to_restore(
1285 triples_or_quads_to_delete: set, triples_or_quads_to_add: set, main_entity_uri: str
1286) -> set:
1287 """
1288 Identify all entities that need to be restored based on the graph differences.
1290 Args:
1291 triples_or_quads_to_delete: Set of triples/quads to be deleted
1292 triples_or_quads_to_add: Set of triples/quads to be added
1293 main_entity_uri: URI of the main entity being restored
1295 Returns:
1296 Set of entity URIs that need to be restored
1297 """
1298 entities_to_restore = {main_entity_uri}
1300 for item in list(triples_or_quads_to_delete) + list(triples_or_quads_to_add):
1301 predicate = str(item[1])
1302 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
1303 continue
1305 subject = str(item[0])
1306 obj = str(item[2])
1307 for uri in [subject, obj]:
1308 if uri != main_entity_uri and validators.url(uri):
1309 entities_to_restore.add(uri)
1311 return entities_to_restore
1314def prepare_entity_snapshots(
1315 entities_to_restore: set, provenance: dict, target_time: str
1316) -> dict:
1317 """
1318 Prepare snapshot information for all entities that need to be restored.
1320 Args:
1321 entities_to_restore: Set of entity URIs to process
1322 provenance: Dictionary containing provenance data for all entities
1323 target_time: Target restoration time
1325 Returns:
1326 Dictionary mapping entity URIs to their restoration information
1327 """
1328 entity_snapshots = {}
1330 for entity_uri in entities_to_restore:
1331 if entity_uri not in provenance:
1332 continue
1334 # Find the appropriate source snapshot
1335 source_snapshot = find_appropriate_snapshot(provenance[entity_uri], target_time)
1336 if not source_snapshot:
1337 continue
1339 # Check if entity is currently deleted by examining its latest snapshot
1340 sorted_snapshots = sorted(
1341 provenance[entity_uri].items(),
1342 key=lambda x: convert_to_datetime(x[1]["generatedAtTime"]),
1343 )
1344 latest_snapshot = sorted_snapshots[-1][1]
1345 is_deleted = (
1346 latest_snapshot.get("invalidatedAtTime")
1347 and latest_snapshot["generatedAtTime"]
1348 == latest_snapshot["invalidatedAtTime"]
1349 )
1351 entity_snapshots[entity_uri] = {
1352 "source": source_snapshot,
1353 "needs_restore": is_deleted,
1354 }
1356 return entity_snapshots
1359def find_appropriate_snapshot(provenance_data: dict, target_time: str) -> Optional[str]:
1360 """
1361 Find the most appropriate snapshot to use as a source for restoration.
1363 Args:
1364 provenance_data: Dictionary of snapshots and their metadata for an entity
1365 target_time: The target restoration time as ISO format string
1367 Returns:
1368 The URI of the most appropriate snapshot, or None if no suitable snapshot is found
1369 """
1370 target_datetime = convert_to_datetime(target_time)
1372 # Convert all generation times to datetime for comparison
1373 valid_snapshots = []
1374 for snapshot_uri, metadata in provenance_data.items():
1375 generation_time = convert_to_datetime(metadata["generatedAtTime"])
1377 # Skip deletion snapshots (where generation time equals invalidation time)
1378 if (
1379 metadata.get("invalidatedAtTime")
1380 and metadata["generatedAtTime"] == metadata["invalidatedAtTime"]
1381 ):
1382 continue
1384 # Only consider snapshots up to our target time
1385 if generation_time <= target_datetime:
1386 valid_snapshots.append((generation_time, snapshot_uri))
1388 if not valid_snapshots:
1389 return None
1391 # Sort by generation time and take the most recent one
1392 valid_snapshots.sort(key=lambda x: x[0])
1393 return valid_snapshots[-1][1]
1396def get_inverse_references(subject_uri: str) -> List[Dict]:
1397 """
1398 Get all entities that reference this entity.
1400 Args:
1401 subject_uri: URI of the entity to find references to
1403 Returns:
1404 List of dictionaries containing reference information
1405 """
1406 sparql = get_sparql()
1407 custom_filter = get_custom_filter()
1409 # Build appropriate query based on triplestore type
1410 if is_virtuoso:
1411 query = f"""
1412 SELECT DISTINCT ?s ?p ?g WHERE {{
1413 GRAPH ?g {{
1414 ?s ?p <{subject_uri}> .
1415 }}
1416 FILTER(?g NOT IN (<{'>, <'.join(VIRTUOSO_EXCLUDED_GRAPHS)}>))
1417 FILTER(?p != <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>)
1418 }}
1419 """
1420 else:
1421 query = f"""
1422 SELECT DISTINCT ?s ?p WHERE {{
1423 ?s ?p <{subject_uri}> .
1424 FILTER(?p != <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>)
1425 }}
1426 """
1428 sparql.setQuery(query)
1429 sparql.setReturnFormat(JSON)
1430 results = sparql.query().convert()
1432 references = []
1433 for result in results["results"]["bindings"]:
1434 subject = result["s"]["value"]
1435 predicate = result["p"]["value"]
1437 # Get the type of the referring entity
1438 type_query = f"""
1439 SELECT ?type WHERE {{
1440 <{subject}> a ?type .
1441 }}
1442 """
1443 sparql.setQuery(type_query)
1444 type_results = sparql.query().convert()
1445 types = [t["type"]["value"] for t in type_results["results"]["bindings"]]
1446 types = [get_highest_priority_class(types)]
1448 references.append({"subject": subject, "predicate": predicate, "types": types})
1450 return references
1453def generate_modification_text(
1454 modifications,
1455 subject_classes,
1456 history,
1457 entity_uri,
1458 current_snapshot,
1459 current_snapshot_timestamp,
1460 custom_filter: Filter,
1461 form_fields,
1462):
1463 """
1464 Generate HTML text describing modifications to an entity, using display rules for property ordering.
1466 Args:
1467 modifications (dict): Dictionary of modifications from parse_sparql_update
1468 subject_classes (list): List of classes for the subject entity
1469 history (dict): Historical snapshots dictionary
1470 entity_uri (str): URI of the entity being modified
1471 current_snapshot (Graph): Current entity snapshot
1472 current_snapshot_timestamp (str): Timestamp of current snapshot
1473 custom_filter (Filter): Filter instance for formatting
1474 form_fields (dict): Form fields configuration from SHACL
1475 """
1476 modification_text = "<p><strong>" + gettext("Modifications") + "</strong></p>"
1478 # Get display rules and property order
1479 display_rules = get_display_rules()
1480 ordered_properties = get_property_order_from_rules(subject_classes, display_rules)
1482 for mod_type, triples in modifications.items():
1483 modification_text += "<ul class='list-group mb-3'><p>"
1484 if mod_type == gettext("Additions"):
1485 modification_text += '<i class="bi bi-plus-circle-fill text-success"></i>'
1486 elif mod_type == gettext("Deletions"):
1487 modification_text += '<i class="bi bi-dash-circle-fill text-danger"></i>'
1488 modification_text += " <em>" + gettext(mod_type) + "</em></p>"
1490 # Group triples by predicate
1491 predicate_groups = {}
1492 for triple in triples:
1493 predicate = str(triple[1])
1494 if predicate not in predicate_groups:
1495 predicate_groups[predicate] = []
1496 predicate_groups[predicate].append(triple)
1498 # Process predicates in order from display rules
1499 processed_predicates = set()
1501 # First handle predicates that are in the ordered list
1502 for predicate in ordered_properties:
1503 if predicate in predicate_groups:
1504 processed_predicates.add(predicate)
1505 for triple in predicate_groups[predicate]:
1506 modification_text += format_triple_modification(
1507 triple,
1508 subject_classes,
1509 mod_type,
1510 history,
1511 entity_uri,
1512 current_snapshot,
1513 current_snapshot_timestamp,
1514 custom_filter,
1515 form_fields,
1516 )
1518 # Then handle any remaining predicates not in the ordered list
1519 for predicate, triples in predicate_groups.items():
1520 if predicate not in processed_predicates:
1521 for triple in triples:
1522 modification_text += format_triple_modification(
1523 triple,
1524 subject_classes,
1525 mod_type,
1526 history,
1527 entity_uri,
1528 current_snapshot,
1529 current_snapshot_timestamp,
1530 custom_filter,
1531 form_fields,
1532 )
1534 modification_text += "</ul>"
1536 return modification_text
1539def format_triple_modification(
1540 triple,
1541 subject_classes,
1542 mod_type,
1543 history,
1544 entity_uri,
1545 current_snapshot,
1546 current_snapshot_timestamp,
1547 custom_filter: Filter,
1548 form_fields,
1549):
1550 """
1551 Format a single triple modification as HTML.
1553 Args:
1554 triple: The RDF triple being modified
1555 subject_classes: List of classes for the subject entity
1556 mod_type: Type of modification (addition/deletion)
1557 history: Historical snapshots dictionary
1558 entity_uri: URI of the entity being modified
1559 current_snapshot: Current entity snapshot
1560 current_snapshot_timestamp: Timestamp of current snapshot
1561 custom_filter: Filter instance for formatting
1562 form_fields: Form fields configuration from SHACL
1564 Returns:
1565 HTML string representing the triple modification """
1566 predicate = triple[1]
1567 predicate_label = custom_filter.human_readable_predicate(predicate, subject_classes)
1568 object_value = triple[2]
1570 # Determine which snapshot to use for context
1571 relevant_snapshot = None
1572 if (
1573 mod_type == gettext("Deletions")
1574 and history
1575 and entity_uri
1576 and current_snapshot_timestamp
1577 ):
1578 sorted_timestamps = sorted(history[entity_uri].keys())
1579 current_index = sorted_timestamps.index(current_snapshot_timestamp)
1580 if current_index > 0:
1581 relevant_snapshot = history[entity_uri][
1582 sorted_timestamps[current_index - 1]
1583 ]
1584 else:
1585 relevant_snapshot = current_snapshot
1587 subject_class = get_highest_priority_class(subject_classes)
1589 object_label = get_object_label(
1590 object_value,
1591 predicate,
1592 subject_class,
1593 form_fields,
1594 relevant_snapshot,
1595 custom_filter,
1596 )
1598 return f"""
1599 <li class='d-flex align-items-center'>
1600 <span class='flex-grow-1 d-flex flex-column justify-content-center ms-3 mb-2 w-100'>
1601 <strong>{predicate_label}</strong>
1602 <span class="object-value word-wrap">{object_label}</span>
1603 </span>
1604 </li>"""
1607def get_object_label(
1608 object_value: str,
1609 predicate: str,
1610 entity_type: str,
1611 form_fields: dict,
1612 snapshot: Optional[Graph],
1613 custom_filter: Filter,
1614) -> str:
1615 """
1616 Get appropriate display label for an object value based on form fields configuration.
1618 Args:
1619 object_value: The value to get a label for
1620 predicate: The predicate URI
1621 entity_type: The type of the entity
1622 form_fields: Form fields configuration from SHACL
1623 snapshot: Optional graph snapshot for context
1624 custom_filter: Custom filter instance for formatting
1626 Returns:
1627 A human-readable label for the object value
1628 """
1629 entity_type = str(entity_type)
1630 predicate = str(predicate)
1632 # Handle RDF type predicates
1633 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
1634 return custom_filter.human_readable_predicate(
1635 object_value, [entity_type]
1636 ).title()
1638 if form_fields and entity_type in form_fields:
1639 predicate_fields = form_fields[entity_type].get(predicate, [])
1640 for field in predicate_fields:
1641 # Check if this is an entity reference
1642 if field.get("nodeShape") or field.get("objectClass"):
1643 if validators.url(object_value):
1644 # Get types for the referenced entity
1645 object_classes = []
1646 if snapshot:
1647 object_classes = [
1648 str(o)
1649 for s, p, o in snapshot.triples(
1650 (URIRef(object_value), RDF.type, None)
1651 )
1652 ]
1654 if not object_classes and field.get("objectClass"):
1655 object_classes = [field["objectClass"]]
1657 return custom_filter.human_readable_entity(
1658 object_value, object_classes, snapshot
1659 )
1661 # Check for mandatory values
1662 if field.get("hasValue") == object_value:
1663 return custom_filter.human_readable_predicate(
1664 object_value, [entity_type]
1665 )
1667 # Check for optional values from a predefined set
1668 if object_value in field.get("optionalValues", []):
1669 return custom_filter.human_readable_predicate(
1670 object_value, [entity_type]
1671 )
1673 # Default to simple string representation for literal values
1674 if not validators.url(object_value):
1675 return object_value
1677 # For any other URIs, use human_readable_predicate
1678 return custom_filter.human_readable_predicate(object_value, [entity_type])
1681def process_modification_data(data: dict) -> Tuple[str, List[dict]]:
1682 """
1683 Process modification data to extract subjects and predicates.
1685 Args:
1686 data: Dictionary containing modification data
1688 Returns:
1689 Tuple containing subject URI and list of modification details
1690 """
1691 subject_uri = data.get("subject")
1692 if not subject_uri:
1693 raise ValueError("No subject URI provided in modification data")
1695 modifications = data.get("modifications", [])
1696 if not modifications:
1697 raise ValueError("No modifications provided in data")
1699 return subject_uri, modifications
1702def validate_modification(
1703 modification: dict, subject_uri: str, form_fields: dict
1704) -> Tuple[bool, str]:
1705 """
1706 Validate a single modification operation.
1708 Args:
1709 modification: Dictionary containing modification details
1710 subject_uri: URI of the subject being modified
1711 form_fields: Form fields configuration from SHACL
1713 Returns:
1714 Tuple of (is_valid, error_message)
1715 """
1716 operation = modification.get("operation")
1717 if not operation:
1718 return False, "No operation specified in modification"
1720 predicate = modification.get("predicate")
1721 if not predicate:
1722 return False, "No predicate specified in modification"
1724 if operation not in ["add", "remove", "update"]:
1725 return False, f"Invalid operation: {operation}"
1727 # Additional validation based on form fields if available
1728 if form_fields:
1729 entity_types = [str(t) for t in get_entity_types(subject_uri)]
1730 entity_type = get_highest_priority_class(entity_types)
1732 if entity_type in form_fields:
1733 predicate_fields = form_fields[entity_type].get(predicate, [])
1735 for field in predicate_fields:
1736 if operation == "remove" and field.get("minCount", 0) > 0:
1737 return False, f"Cannot remove required predicate: {predicate}"
1739 if operation == "add":
1740 current_count = get_predicate_count(subject_uri, predicate)
1741 max_count = field.get("maxCount")
1743 if max_count and current_count >= max_count:
1744 return (
1745 False,
1746 f"Maximum count exceeded for predicate: {predicate}",
1747 )
1749 return True, ""
1752def get_predicate_count(subject_uri: str, predicate: str) -> int:
1753 """
1754 Get the current count of values for a predicate.
1756 Args:
1757 subject_uri: URI of the entity
1758 predicate: Predicate URI to count
1760 Returns:
1761 Number of values for the predicate
1762 """
1763 sparql = get_sparql()
1765 query = f"""
1766 SELECT (COUNT(?o) as ?count) WHERE {{
1767 <{subject_uri}> <{predicate}> ?o .
1768 }}
1769 """
1771 sparql.setQuery(query)
1772 sparql.setReturnFormat(JSON)
1773 results = sparql.query().convert()
1775 return int(results["results"]["bindings"][0]["count"]["value"])
1778def apply_modifications(
1779 editor: Editor,
1780 modifications: List[dict],
1781 subject_uri: str,
1782 graph_uri: Optional[str] = None,
1783):
1784 """
1785 Apply a list of modifications to an entity.
1787 Args:
1788 editor: Editor instance to use for modifications
1789 modifications: List of modification operations
1790 subject_uri: URI of the entity being modified
1791 graph_uri: Optional graph URI for quad store
1792 """
1793 for mod in modifications:
1794 operation = mod["operation"]
1795 predicate = mod["predicate"]
1797 if operation == "remove":
1798 editor.delete(URIRef(subject_uri), URIRef(predicate), graph_uri=graph_uri)
1800 elif operation == "add":
1801 value = mod["value"]
1802 datatype = mod.get("datatype", XSD.string)
1804 if validators.url(value):
1805 object_value = URIRef(value)
1806 else:
1807 object_value = Literal(value, datatype=URIRef(datatype))
1809 editor.create(
1810 URIRef(subject_uri), URIRef(predicate), object_value, graph_uri
1811 )
1813 elif operation == "update":
1814 old_value = mod["oldValue"]
1815 new_value = mod["newValue"]
1816 datatype = mod.get("datatype", XSD.string)
1818 if validators.url(old_value):
1819 old_object = URIRef(old_value)
1820 else:
1821 old_object = Literal(old_value, datatype=URIRef(datatype))
1823 if validators.url(new_value):
1824 new_object = URIRef(new_value)
1825 else:
1826 new_object = Literal(new_value, datatype=URIRef(datatype))
1828 editor.update(
1829 URIRef(subject_uri),
1830 URIRef(predicate),
1831 old_object,
1832 new_object,
1833 graph_uri,
1834 )