Coverage for heritrace / routes / entity / _restoration.py: 88%
189 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7from typing import TYPE_CHECKING
9from flask import abort, current_app, flash, redirect, url_for
10from flask_babel import gettext
11from flask_login import current_user, login_required
12from rdflib import Dataset, Graph, Literal, URIRef
13from time_agnostic_library.agnostic_entity import AgnosticEntity
15if TYPE_CHECKING:
16 from datetime import datetime
18 from werkzeug.wrappers import Response
20from heritrace.apis.orcid import get_responsible_agent_uri
21from heritrace.editor import Editor, EditorError, EndpointConfig
22from heritrace.extensions import (
23 get_change_tracking_config,
24 get_dataset_endpoint,
25 get_dataset_is_quadstore,
26 get_provenance_endpoint,
27)
28from heritrace.routes.entity._blueprint import entity_bp
29from heritrace.routes.entity._types import _DATETIME_MIN_UTC, _QUAD_LENGTH
30from heritrace.utils.converters import convert_to_datetime
31from heritrace.utils.sparql_utils import (
32 fetch_current_state_with_related_entities,
33 get_triples_from_graph,
34 n3_set_to_graph,
35)
36from heritrace.utils.uri_utils import is_valid_url
39def _apply_deletions(
40 editor: Editor, triples_to_delete: set, entity_snapshots: dict
41) -> None:
42 for item in triples_to_delete:
43 s, p, o = URIRef(str(item[0])), URIRef(str(item[1])), item[2]
44 obj: URIRef | Literal = URIRef(str(o)) if isinstance(o, URIRef) else Literal(o)
45 if len(item) == _QUAD_LENGTH:
46 editor.delete(s, p, obj, URIRef(str(item[3])))
47 else:
48 editor.delete(s, p, obj)
50 subject = str(item[0])
51 if subject in entity_snapshots:
52 entity_info = entity_snapshots[subject]
53 if entity_info["needs_restore"]:
54 editor.g_set.mark_as_restored(URIRef(subject))
55 editor.g_set.entity_index[URIRef(subject)]["restoration_source"] = (
56 entity_info["source"]
57 )
60def _apply_additions(
61 editor: Editor, triples_to_add: set, entity_snapshots: dict
62) -> None:
63 for item in triples_to_add:
64 s, p, o = URIRef(str(item[0])), URIRef(str(item[1])), item[2]
65 obj: URIRef | Literal = URIRef(str(o)) if isinstance(o, URIRef) else Literal(o)
66 if len(item) == _QUAD_LENGTH:
67 editor.create(s, p, obj, URIRef(str(item[3])))
68 else:
69 editor.create(s, p, obj)
71 subject = str(item[0])
72 if subject in entity_snapshots:
73 entity_info = entity_snapshots[subject]
74 if entity_info["needs_restore"]:
75 editor.g_set.mark_as_restored(URIRef(subject))
76 editor.g_set.entity_index[URIRef(subject)]["source"] = entity_info[
77 "source"
78 ]
81def _parse_snapshot_time(value: str) -> datetime:
82 parsed = convert_to_datetime(value)
83 if parsed is None:
84 msg = f"Failed to parse snapshot time: {value}"
85 raise ValueError(msg)
86 return parsed
89def get_co_transaction_times(
90 entity_provenance: dict, target_time: datetime
91) -> set[datetime]:
92 return {
93 generation_time
94 for metadata in entity_provenance.values()
95 if (generation_time := _parse_snapshot_time(metadata["generatedAtTime"]))
96 > target_time
97 }
100def compute_entity_deltas(
101 entity_states: dict[str, set[tuple[str, ...]]],
102) -> list[tuple[datetime, set[tuple[str, ...]], set[tuple[str, ...]]]]:
103 sorted_states = sorted(
104 ((_parse_snapshot_time(ts), state) for ts, state in entity_states.items()),
105 key=lambda item: item[0],
106 )
107 deltas = []
108 previous: set[tuple[str, ...]] = set()
109 for time, state in sorted_states:
110 deltas.append((time, state - previous, previous - state))
111 previous = state
112 return deltas
115def build_restored_state(
116 entity_states: dict[str, set[tuple[str, ...]]],
117 co_transaction_times: set[datetime],
118) -> tuple[set[tuple[str, ...]], datetime | None]:
119 """Revert the entity's snapshots generated in the transactions being undone.
121 Snapshots whose generation time is not in ``co_transaction_times`` are kept,
122 so changes unrelated to the restored entity survive. Reverts use set
123 semantics: a triple added in a reverted snapshot but already removed by a
124 later surviving snapshot stays removed.
125 """
126 deltas = compute_entity_deltas(entity_states)
127 restored: set[tuple[str, ...]] = set()
128 for _, added, removed in deltas:
129 restored |= added
130 restored -= removed
131 revert_floor = None
132 for time, added, removed in reversed(deltas):
133 if time in co_transaction_times:
134 restored -= added
135 restored |= removed
136 revert_floor = time
137 return restored, revert_floor
140def _build_restored_states(
141 states: dict[str, dict[str, set[tuple[str, ...]]]],
142 co_transaction_times: set[datetime],
143) -> tuple[set[tuple[str, ...]], dict[str, datetime]]:
144 restored_n3_state: set[tuple[str, ...]] = set()
145 revert_floors: dict[str, datetime] = {}
146 for uri, entity_states in states.items():
147 entity_restored_state, revert_floor = build_restored_state(
148 entity_states, co_transaction_times
149 )
150 restored_n3_state |= entity_restored_state
151 if revert_floor is not None:
152 revert_floors[uri] = revert_floor
153 return restored_n3_state, revert_floors
156@entity_bp.route("/restore-version/<path:entity_uri>/<timestamp>", methods=["POST"])
157@login_required
158def restore_version(entity_uri: str, timestamp: str) -> Response:
159 entity_uri_ref = URIRef(entity_uri)
160 timestamp_dt = convert_to_datetime(timestamp)
161 if timestamp_dt is None:
162 abort(404)
163 change_tracking_config = get_change_tracking_config()
165 agnostic_entity = AgnosticEntity(
166 res=entity_uri,
167 config=change_tracking_config,
168 include_related_objects=True,
169 include_merged_entities=True,
170 include_reverse_relations=True,
171 )
172 states, provenance = agnostic_entity.get_histories_by_entity(
173 include_prov_metadata=True
174 )
176 main_entity_states = states.get(entity_uri)
177 if not main_entity_states or timestamp_dt not in {
178 convert_to_datetime(ts) for ts in main_entity_states
179 }:
180 abort(404)
182 co_transaction_times = get_co_transaction_times(
183 provenance[entity_uri], timestamp_dt
184 )
185 restored_n3_state, revert_floors = _build_restored_states(
186 states, co_transaction_times
187 )
189 historical_graph = n3_set_to_graph(
190 restored_n3_state, is_quadstore=get_dataset_is_quadstore()
191 )
192 current_graph = fetch_current_state_with_related_entities(provenance)
194 is_deleted = (
195 len(list(get_triples_from_graph(current_graph, (entity_uri_ref, None, None))))
196 == 0
197 )
199 triples_or_quads_to_delete, triples_or_quads_to_add = compute_graph_differences(
200 current_graph, historical_graph
201 )
203 entities_to_restore = get_entities_to_restore(
204 triples_or_quads_to_delete, triples_or_quads_to_add, entity_uri
205 )
207 entity_snapshots = prepare_entity_snapshots(
208 entities_to_restore, provenance, timestamp_dt.isoformat(), revert_floors
209 )
211 source_uri = None if is_deleted else entity_snapshots[entity_uri]["source"]
212 resp_agent = get_responsible_agent_uri(current_user.orcid)
213 editor = Editor(
214 EndpointConfig(
215 dataset=get_dataset_endpoint(),
216 provenance=get_provenance_endpoint(),
217 is_quadstore=current_app.config["DATASET_IS_QUADSTORE"],
218 ),
219 current_app.config["COUNTER_HANDLER"],
220 resp_agent,
221 URIRef(source_uri) if source_uri else None,
222 current_app.config["DATASET_GENERATION_TIME"],
223 )
225 if get_dataset_is_quadstore():
226 if not isinstance(current_graph, Dataset):
227 msg = "Expected Dataset instance"
228 raise TypeError(msg)
229 for quad in current_graph.quads():
230 editor.g_set.add(quad) # type: ignore[arg-type]
231 else:
232 for triple in current_graph:
233 editor.g_set.add(triple) # type: ignore[arg-type]
234 editor.preexisting_finished()
236 _apply_deletions(editor, triples_or_quads_to_delete, entity_snapshots)
237 _apply_additions(editor, triples_or_quads_to_add, entity_snapshots)
239 if is_deleted and entity_uri in entity_snapshots:
240 editor.g_set.mark_as_restored(entity_uri_ref)
241 source = entity_snapshots[entity_uri]["source"]
242 editor.g_set.entity_index[entity_uri_ref]["source"] = source
244 try:
245 editor.save()
246 flash(gettext("Version restored successfully"), "success")
247 except (EditorError, OSError) as e:
248 flash(
249 gettext(
250 "An error occurred while restoring the version: %(error)s", error=str(e)
251 ),
252 "error",
253 )
255 return redirect(url_for("entity.about", subject=entity_uri))
258def compute_graph_differences(
259 current_graph: Graph | Dataset, historical_graph: Graph | Dataset
260) -> tuple[set, set]:
261 if get_dataset_is_quadstore():
262 if not isinstance(current_graph, Dataset):
263 msg = "Expected Dataset instance for current_graph"
264 raise TypeError(msg)
265 if not isinstance(historical_graph, Dataset):
266 msg = "Expected Dataset instance for historical_graph"
267 raise TypeError(msg)
268 current_quads = set(current_graph.quads())
269 historical_quads = set(historical_graph.quads())
270 return current_quads - historical_quads, historical_quads - current_quads
271 current_triples = set(get_triples_from_graph(current_graph, (None, None, None)))
272 historical_triples = set(
273 get_triples_from_graph(historical_graph, (None, None, None))
274 )
275 return current_triples - historical_triples, historical_triples - current_triples
278def get_entities_to_restore(
279 triples_or_quads_to_delete: set, triples_or_quads_to_add: set, main_entity_uri: str
280) -> set:
281 entities_to_restore = {main_entity_uri}
283 for item in list(triples_or_quads_to_delete) + list(triples_or_quads_to_add):
284 predicate = str(item[1])
285 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
286 continue
288 subject = str(item[0])
289 obj = str(item[2])
290 for uri in [subject, obj]:
291 if uri != main_entity_uri and is_valid_url(uri):
292 entities_to_restore.add(uri)
294 return entities_to_restore
297def prepare_entity_snapshots(
298 entities_to_restore: set,
299 provenance: dict,
300 target_time: str,
301 revert_floors: dict[str, datetime] | None = None,
302) -> dict:
303 revert_floors = revert_floors or {}
304 entity_snapshots = {}
306 for entity_uri in entities_to_restore:
307 if entity_uri not in provenance:
308 continue
310 revert_floor = revert_floors.get(entity_uri)
311 if revert_floor is None:
312 source_snapshot = find_appropriate_snapshot(
313 provenance[entity_uri], target_time
314 )
315 else:
316 source_snapshot = find_appropriate_snapshot(
317 provenance[entity_uri], revert_floor.isoformat(), inclusive=False
318 )
319 if not source_snapshot:
320 continue
322 sorted_snapshots = sorted(
323 provenance[entity_uri].items(),
324 key=lambda x: (
325 convert_to_datetime(x[1]["generatedAtTime"]) or _DATETIME_MIN_UTC
326 ),
327 )
328 latest_snapshot = sorted_snapshots[-1][1]
329 is_deleted = bool(
330 latest_snapshot.get("invalidatedAtTime")
331 and latest_snapshot["generatedAtTime"]
332 == latest_snapshot["invalidatedAtTime"]
333 )
335 entity_snapshots[entity_uri] = {
336 "source": source_snapshot,
337 "needs_restore": is_deleted,
338 }
340 return entity_snapshots
343def find_appropriate_snapshot(
344 provenance_data: dict, target_time: str, *, inclusive: bool = True
345) -> str | None:
346 target_datetime = convert_to_datetime(target_time)
347 if target_datetime is None:
348 msg = f"Failed to parse target_time: {target_time}"
349 raise ValueError(msg)
351 valid_snapshots: list[tuple[datetime, str]] = []
352 for snapshot_uri, metadata in provenance_data.items():
353 generation_time = convert_to_datetime(metadata["generatedAtTime"])
355 if (
356 metadata.get("invalidatedAtTime")
357 and metadata["generatedAtTime"] == metadata["invalidatedAtTime"]
358 ):
359 continue
361 if generation_time is None:
362 continue
363 in_range = (
364 generation_time <= target_datetime
365 if inclusive
366 else generation_time < target_datetime
367 )
368 if in_range:
369 valid_snapshots.append((generation_time, snapshot_uri))
371 if not valid_snapshots:
372 return None
374 valid_snapshots.sort(key=lambda x: x[0])
375 return valid_snapshots[-1][1]