Coverage for heritrace / routes / entity / _restoration.py: 88%

189 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7from typing import TYPE_CHECKING 

8 

9from flask import abort, current_app, flash, redirect, url_for 

10from flask_babel import gettext 

11from flask_login import current_user, login_required 

12from rdflib import Dataset, Graph, Literal, URIRef 

13from time_agnostic_library.agnostic_entity import AgnosticEntity 

14 

15if TYPE_CHECKING: 

16 from datetime import datetime 

17 

18 from werkzeug.wrappers import Response 

19 

20from heritrace.apis.orcid import get_responsible_agent_uri 

21from heritrace.editor import Editor, EditorError, EndpointConfig 

22from heritrace.extensions import ( 

23 get_change_tracking_config, 

24 get_dataset_endpoint, 

25 get_dataset_is_quadstore, 

26 get_provenance_endpoint, 

27) 

28from heritrace.routes.entity._blueprint import entity_bp 

29from heritrace.routes.entity._types import _DATETIME_MIN_UTC, _QUAD_LENGTH 

30from heritrace.utils.converters import convert_to_datetime 

31from heritrace.utils.sparql_utils import ( 

32 fetch_current_state_with_related_entities, 

33 get_triples_from_graph, 

34 n3_set_to_graph, 

35) 

36from heritrace.utils.uri_utils import is_valid_url 

37 

38 

39def _apply_deletions( 

40 editor: Editor, triples_to_delete: set, entity_snapshots: dict 

41) -> None: 

42 for item in triples_to_delete: 

43 s, p, o = URIRef(str(item[0])), URIRef(str(item[1])), item[2] 

44 obj: URIRef | Literal = URIRef(str(o)) if isinstance(o, URIRef) else Literal(o) 

45 if len(item) == _QUAD_LENGTH: 

46 editor.delete(s, p, obj, URIRef(str(item[3]))) 

47 else: 

48 editor.delete(s, p, obj) 

49 

50 subject = str(item[0]) 

51 if subject in entity_snapshots: 

52 entity_info = entity_snapshots[subject] 

53 if entity_info["needs_restore"]: 

54 editor.g_set.mark_as_restored(URIRef(subject)) 

55 editor.g_set.entity_index[URIRef(subject)]["restoration_source"] = ( 

56 entity_info["source"] 

57 ) 

58 

59 

60def _apply_additions( 

61 editor: Editor, triples_to_add: set, entity_snapshots: dict 

62) -> None: 

63 for item in triples_to_add: 

64 s, p, o = URIRef(str(item[0])), URIRef(str(item[1])), item[2] 

65 obj: URIRef | Literal = URIRef(str(o)) if isinstance(o, URIRef) else Literal(o) 

66 if len(item) == _QUAD_LENGTH: 

67 editor.create(s, p, obj, URIRef(str(item[3]))) 

68 else: 

69 editor.create(s, p, obj) 

70 

71 subject = str(item[0]) 

72 if subject in entity_snapshots: 

73 entity_info = entity_snapshots[subject] 

74 if entity_info["needs_restore"]: 

75 editor.g_set.mark_as_restored(URIRef(subject)) 

76 editor.g_set.entity_index[URIRef(subject)]["source"] = entity_info[ 

77 "source" 

78 ] 

79 

80 

81def _parse_snapshot_time(value: str) -> datetime: 

82 parsed = convert_to_datetime(value) 

83 if parsed is None: 

84 msg = f"Failed to parse snapshot time: {value}" 

85 raise ValueError(msg) 

86 return parsed 

87 

88 

89def get_co_transaction_times( 

90 entity_provenance: dict, target_time: datetime 

91) -> set[datetime]: 

92 return { 

93 generation_time 

94 for metadata in entity_provenance.values() 

95 if (generation_time := _parse_snapshot_time(metadata["generatedAtTime"])) 

96 > target_time 

97 } 

98 

99 

100def compute_entity_deltas( 

101 entity_states: dict[str, set[tuple[str, ...]]], 

102) -> list[tuple[datetime, set[tuple[str, ...]], set[tuple[str, ...]]]]: 

103 sorted_states = sorted( 

104 ((_parse_snapshot_time(ts), state) for ts, state in entity_states.items()), 

105 key=lambda item: item[0], 

106 ) 

107 deltas = [] 

108 previous: set[tuple[str, ...]] = set() 

109 for time, state in sorted_states: 

110 deltas.append((time, state - previous, previous - state)) 

111 previous = state 

112 return deltas 

113 

114 

115def build_restored_state( 

116 entity_states: dict[str, set[tuple[str, ...]]], 

117 co_transaction_times: set[datetime], 

118) -> tuple[set[tuple[str, ...]], datetime | None]: 

119 """Revert the entity's snapshots generated in the transactions being undone. 

120 

121 Snapshots whose generation time is not in ``co_transaction_times`` are kept, 

122 so changes unrelated to the restored entity survive. Reverts use set 

123 semantics: a triple added in a reverted snapshot but already removed by a 

124 later surviving snapshot stays removed. 

125 """ 

126 deltas = compute_entity_deltas(entity_states) 

127 restored: set[tuple[str, ...]] = set() 

128 for _, added, removed in deltas: 

129 restored |= added 

130 restored -= removed 

131 revert_floor = None 

132 for time, added, removed in reversed(deltas): 

133 if time in co_transaction_times: 

134 restored -= added 

135 restored |= removed 

136 revert_floor = time 

137 return restored, revert_floor 

138 

139 

140def _build_restored_states( 

141 states: dict[str, dict[str, set[tuple[str, ...]]]], 

142 co_transaction_times: set[datetime], 

143) -> tuple[set[tuple[str, ...]], dict[str, datetime]]: 

144 restored_n3_state: set[tuple[str, ...]] = set() 

145 revert_floors: dict[str, datetime] = {} 

146 for uri, entity_states in states.items(): 

147 entity_restored_state, revert_floor = build_restored_state( 

148 entity_states, co_transaction_times 

149 ) 

150 restored_n3_state |= entity_restored_state 

151 if revert_floor is not None: 

152 revert_floors[uri] = revert_floor 

153 return restored_n3_state, revert_floors 

154 

155 

156@entity_bp.route("/restore-version/<path:entity_uri>/<timestamp>", methods=["POST"]) 

157@login_required 

158def restore_version(entity_uri: str, timestamp: str) -> Response: 

159 entity_uri_ref = URIRef(entity_uri) 

160 timestamp_dt = convert_to_datetime(timestamp) 

161 if timestamp_dt is None: 

162 abort(404) 

163 change_tracking_config = get_change_tracking_config() 

164 

165 agnostic_entity = AgnosticEntity( 

166 res=entity_uri, 

167 config=change_tracking_config, 

168 include_related_objects=True, 

169 include_merged_entities=True, 

170 include_reverse_relations=True, 

171 ) 

172 states, provenance = agnostic_entity.get_histories_by_entity( 

173 include_prov_metadata=True 

174 ) 

175 

176 main_entity_states = states.get(entity_uri) 

177 if not main_entity_states or timestamp_dt not in { 

178 convert_to_datetime(ts) for ts in main_entity_states 

179 }: 

180 abort(404) 

181 

182 co_transaction_times = get_co_transaction_times( 

183 provenance[entity_uri], timestamp_dt 

184 ) 

185 restored_n3_state, revert_floors = _build_restored_states( 

186 states, co_transaction_times 

187 ) 

188 

189 historical_graph = n3_set_to_graph( 

190 restored_n3_state, is_quadstore=get_dataset_is_quadstore() 

191 ) 

192 current_graph = fetch_current_state_with_related_entities(provenance) 

193 

194 is_deleted = ( 

195 len(list(get_triples_from_graph(current_graph, (entity_uri_ref, None, None)))) 

196 == 0 

197 ) 

198 

199 triples_or_quads_to_delete, triples_or_quads_to_add = compute_graph_differences( 

200 current_graph, historical_graph 

201 ) 

202 

203 entities_to_restore = get_entities_to_restore( 

204 triples_or_quads_to_delete, triples_or_quads_to_add, entity_uri 

205 ) 

206 

207 entity_snapshots = prepare_entity_snapshots( 

208 entities_to_restore, provenance, timestamp_dt.isoformat(), revert_floors 

209 ) 

210 

211 source_uri = None if is_deleted else entity_snapshots[entity_uri]["source"] 

212 resp_agent = get_responsible_agent_uri(current_user.orcid) 

213 editor = Editor( 

214 EndpointConfig( 

215 dataset=get_dataset_endpoint(), 

216 provenance=get_provenance_endpoint(), 

217 is_quadstore=current_app.config["DATASET_IS_QUADSTORE"], 

218 ), 

219 current_app.config["COUNTER_HANDLER"], 

220 resp_agent, 

221 URIRef(source_uri) if source_uri else None, 

222 current_app.config["DATASET_GENERATION_TIME"], 

223 ) 

224 

225 if get_dataset_is_quadstore(): 

226 if not isinstance(current_graph, Dataset): 

227 msg = "Expected Dataset instance" 

228 raise TypeError(msg) 

229 for quad in current_graph.quads(): 

230 editor.g_set.add(quad) # type: ignore[arg-type] 

231 else: 

232 for triple in current_graph: 

233 editor.g_set.add(triple) # type: ignore[arg-type] 

234 editor.preexisting_finished() 

235 

236 _apply_deletions(editor, triples_or_quads_to_delete, entity_snapshots) 

237 _apply_additions(editor, triples_or_quads_to_add, entity_snapshots) 

238 

239 if is_deleted and entity_uri in entity_snapshots: 

240 editor.g_set.mark_as_restored(entity_uri_ref) 

241 source = entity_snapshots[entity_uri]["source"] 

242 editor.g_set.entity_index[entity_uri_ref]["source"] = source 

243 

244 try: 

245 editor.save() 

246 flash(gettext("Version restored successfully"), "success") 

247 except (EditorError, OSError) as e: 

248 flash( 

249 gettext( 

250 "An error occurred while restoring the version: %(error)s", error=str(e) 

251 ), 

252 "error", 

253 ) 

254 

255 return redirect(url_for("entity.about", subject=entity_uri)) 

256 

257 

258def compute_graph_differences( 

259 current_graph: Graph | Dataset, historical_graph: Graph | Dataset 

260) -> tuple[set, set]: 

261 if get_dataset_is_quadstore(): 

262 if not isinstance(current_graph, Dataset): 

263 msg = "Expected Dataset instance for current_graph" 

264 raise TypeError(msg) 

265 if not isinstance(historical_graph, Dataset): 

266 msg = "Expected Dataset instance for historical_graph" 

267 raise TypeError(msg) 

268 current_quads = set(current_graph.quads()) 

269 historical_quads = set(historical_graph.quads()) 

270 return current_quads - historical_quads, historical_quads - current_quads 

271 current_triples = set(get_triples_from_graph(current_graph, (None, None, None))) 

272 historical_triples = set( 

273 get_triples_from_graph(historical_graph, (None, None, None)) 

274 ) 

275 return current_triples - historical_triples, historical_triples - current_triples 

276 

277 

278def get_entities_to_restore( 

279 triples_or_quads_to_delete: set, triples_or_quads_to_add: set, main_entity_uri: str 

280) -> set: 

281 entities_to_restore = {main_entity_uri} 

282 

283 for item in list(triples_or_quads_to_delete) + list(triples_or_quads_to_add): 

284 predicate = str(item[1]) 

285 if predicate == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": 

286 continue 

287 

288 subject = str(item[0]) 

289 obj = str(item[2]) 

290 for uri in [subject, obj]: 

291 if uri != main_entity_uri and is_valid_url(uri): 

292 entities_to_restore.add(uri) 

293 

294 return entities_to_restore 

295 

296 

297def prepare_entity_snapshots( 

298 entities_to_restore: set, 

299 provenance: dict, 

300 target_time: str, 

301 revert_floors: dict[str, datetime] | None = None, 

302) -> dict: 

303 revert_floors = revert_floors or {} 

304 entity_snapshots = {} 

305 

306 for entity_uri in entities_to_restore: 

307 if entity_uri not in provenance: 

308 continue 

309 

310 revert_floor = revert_floors.get(entity_uri) 

311 if revert_floor is None: 

312 source_snapshot = find_appropriate_snapshot( 

313 provenance[entity_uri], target_time 

314 ) 

315 else: 

316 source_snapshot = find_appropriate_snapshot( 

317 provenance[entity_uri], revert_floor.isoformat(), inclusive=False 

318 ) 

319 if not source_snapshot: 

320 continue 

321 

322 sorted_snapshots = sorted( 

323 provenance[entity_uri].items(), 

324 key=lambda x: ( 

325 convert_to_datetime(x[1]["generatedAtTime"]) or _DATETIME_MIN_UTC 

326 ), 

327 ) 

328 latest_snapshot = sorted_snapshots[-1][1] 

329 is_deleted = bool( 

330 latest_snapshot.get("invalidatedAtTime") 

331 and latest_snapshot["generatedAtTime"] 

332 == latest_snapshot["invalidatedAtTime"] 

333 ) 

334 

335 entity_snapshots[entity_uri] = { 

336 "source": source_snapshot, 

337 "needs_restore": is_deleted, 

338 } 

339 

340 return entity_snapshots 

341 

342 

343def find_appropriate_snapshot( 

344 provenance_data: dict, target_time: str, *, inclusive: bool = True 

345) -> str | None: 

346 target_datetime = convert_to_datetime(target_time) 

347 if target_datetime is None: 

348 msg = f"Failed to parse target_time: {target_time}" 

349 raise ValueError(msg) 

350 

351 valid_snapshots: list[tuple[datetime, str]] = [] 

352 for snapshot_uri, metadata in provenance_data.items(): 

353 generation_time = convert_to_datetime(metadata["generatedAtTime"]) 

354 

355 if ( 

356 metadata.get("invalidatedAtTime") 

357 and metadata["generatedAtTime"] == metadata["invalidatedAtTime"] 

358 ): 

359 continue 

360 

361 if generation_time is None: 

362 continue 

363 in_range = ( 

364 generation_time <= target_datetime 

365 if inclusive 

366 else generation_time < target_datetime 

367 ) 

368 if in_range: 

369 valid_snapshots.append((generation_time, snapshot_uri)) 

370 

371 if not valid_snapshots: 

372 return None 

373 

374 valid_snapshots.sort(key=lambda x: x[0]) 

375 return valid_snapshots[-1][1]