Coverage for src / time_agnostic_library / agnostic_entity.py: 100%

563 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-03-21 11:54 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7import re 

8from datetime import datetime 

9from functools import lru_cache 

10 

11from time_agnostic_library.prov_entity import ProvEntity 

12from time_agnostic_library.sparql import Sparql, _n3_value 

13from time_agnostic_library.support import convert_to_datetime 

14 

15_OPERATION_RE = re.compile(r'(DELETE|INSERT)\s+DATA', re.IGNORECASE) 

16_GRAPH_BLOCK_RE = re.compile(r'GRAPH\s*<([^>]+)>\s*\{', re.IGNORECASE) 

17 

18_RDF_TERM_RE = re.compile( 

19 r'<([^>]+)>' 

20 r'|"((?:[^"\\]|\\.)*)"\^\^<([^>]+)>' 

21 r'|"((?:[^"\\]|\\.)*)"@([a-zA-Z][\w-]*)' 

22 r'|"((?:[^"\\]|\\.)*)"' 

23 r"|'((?:[^'\\]|\\.)*)'" 

24 r'|(_:\S+)', 

25 re.DOTALL, 

26) 

27 

28_ESCAPE_CHAR_RE = re.compile(r'\\(.)') 

29_ESCAPE_CHAR_MAP = {'n': '\n', 'r': '\r', 't': '\t'} 

30 

31_RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" 

32 

33 

34def _unescape_literal(s: str) -> str: 

35 if '\\' not in s: 

36 return s 

37 return _ESCAPE_CHAR_RE.sub( 

38 lambda m: _ESCAPE_CHAR_MAP.get(m.group(1), m.group(1)), s 

39 ) 

40 

41 

42def _normalize_literal(raw: str) -> str: 

43 unescaped = _unescape_literal(raw) 

44 return unescaped.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r') 

45 

46 

47def _regex_match_to_n3(match: re.Match) -> str: 

48 uri = match.group(1) 

49 if uri is not None: 

50 return f"<{uri}>" 

51 

52 typed_value = match.group(2) 

53 if typed_value is not None: 

54 return f'"{_normalize_literal(typed_value)}"^^<{match.group(3)}>' 

55 

56 lang_value = match.group(4) 

57 if lang_value is not None: 

58 return f'"{_normalize_literal(lang_value)}"@{match.group(5)}' 

59 

60 double_quoted = match.group(6) 

61 if double_quoted is not None: 

62 return f'"{_normalize_literal(double_quoted)}"' 

63 

64 single_quoted = match.group(7) 

65 if single_quoted is not None: 

66 return f'"{_normalize_literal(single_quoted)}"' 

67 

68 return match.group(8) 

69 

70 

71_BRACE_OR_QUOTE_RE = re.compile(r'[{}\'"]') 

72 

73 

74def _find_matching_close_brace(text: str, start: int) -> int: 

75 pos = start 

76 length = len(text) 

77 depth = 1 

78 while pos < length: 

79 m = _BRACE_OR_QUOTE_RE.search(text, pos) 

80 if m is None: 

81 return length 

82 pos = m.start() 

83 char = text[pos] 

84 if char == '{': 

85 depth += 1 

86 pos += 1 

87 elif char == '}': 

88 depth -= 1 

89 if depth == 0: 

90 return pos 

91 pos += 1 

92 else: 

93 quote_char = char 

94 pos += 1 

95 while pos < length: 

96 q = text.find(quote_char, pos) 

97 if q == -1: 

98 pos = length 

99 break 

100 num_backslashes = 0 

101 check = q - 1 

102 while check >= start and text[check] == '\\': 

103 num_backslashes += 1 

104 check -= 1 

105 if num_backslashes % 2 == 0: 

106 pos = q + 1 

107 break 

108 pos = q + 1 

109 return length 

110 

111 

112def _fast_parse_update(update_query: str) -> list[tuple[str, list[tuple[str, str, str, str]]]]: 

113 operations: list[tuple[str, list[tuple[str, str, str, str]]]] = [] 

114 operation_matches = list(_OPERATION_RE.finditer(update_query)) 

115 query_len = len(update_query) 

116 

117 for i, operation_match in enumerate(operation_matches): 

118 operation_type = 'DeleteData' if operation_match.group(1).upper() == 'DELETE' else 'InsertData' 

119 

120 op_start = operation_match.end() 

121 op_end = operation_matches[i + 1].start() if i + 1 < len(operation_matches) else query_len 

122 operation_body = update_query[op_start:op_end] 

123 

124 quads: list[tuple[str, str, str, str]] = [] 

125 for graph_match in _GRAPH_BLOCK_RE.finditer(operation_body): 

126 graph_n3 = f"<{graph_match.group(1)}>" 

127 triples_start = graph_match.end() 

128 triples_end = _find_matching_close_brace(operation_body, triples_start) 

129 triples_text = operation_body[triples_start:triples_end] 

130 

131 terms: list[str] = [] 

132 for m in _RDF_TERM_RE.finditer(triples_text): 

133 terms.append(_regex_match_to_n3(m)) 

134 if len(terms) == 3: 

135 quads.append((terms[0], terms[1], terms[2], graph_n3)) 

136 terms.clear() 

137 

138 operations.append((operation_type, quads)) 

139 

140 return operations 

141 

142 

143def _compose_update_queries( 

144 update_queries: list[str], 

145) -> tuple[set[tuple[str, ...]], set[tuple[str, ...]]]: 

146 additions: set[tuple[str, ...]] = set() 

147 deletions: set[tuple[str, ...]] = set() 

148 for uq in update_queries: 

149 for op_type, quads in _fast_parse_update(uq): 

150 if op_type == 'DeleteData': 

151 for quad in quads: 

152 if quad in additions: 

153 additions.discard(quad) 

154 else: 

155 deletions.add(quad) 

156 elif op_type == 'InsertData': 

157 for quad in quads: 

158 if quad in deletions: 

159 deletions.discard(quad) 

160 else: 

161 additions.add(quad) 

162 return additions, deletions 

163 

164 

165CONFIG_PATH = "./config.json" 

166 

167 

168@lru_cache(maxsize=4096) 

169def _parse_datetime(time_string: str) -> datetime: 

170 result = convert_to_datetime(time_string) 

171 assert isinstance(result, datetime) 

172 return result 

173 

174 

175_GEN_AT_TIME_N3 = f"<{ProvEntity.iri_generated_at_time}>" 

176_HAS_UQ_N3 = f"<{ProvEntity.iri_has_update_query}>" 

177 

178 

179def _extract_snapshot_update_queries(quads: set[tuple[str, ...]]) -> dict[str, str | None]: 

180 by_subject: dict[str, dict[str, str]] = {} 

181 for quad in quads: 

182 if quad[1] in (_GEN_AT_TIME_N3, _HAS_UQ_N3): 

183 by_subject.setdefault(quad[0], {})[quad[1]] = _n3_value(quad[2]) 

184 result: dict[str, str | None] = {} 

185 for props in by_subject.values(): 

186 if _GEN_AT_TIME_N3 in props: 

187 result[props[_GEN_AT_TIME_N3]] = props.get(_HAS_UQ_N3) 

188 return result 

189 

190 

191_PROV_PREFIX = ProvEntity.PROV 

192_RDF_TYPE_N3 = f"<{_RDF_TYPE}>" 

193 

194 

195def _find_related_object_uris(entity_uri: str, graphs: dict) -> set[str]: 

196 entity_n3 = f"<{entity_uri}>" 

197 result = set() 

198 for quad_set in graphs.values(): 

199 if quad_set is None: 

200 continue 

201 for quad in quad_set: 

202 if quad[0] == entity_n3 and quad[2].startswith('<') and _PROV_PREFIX not in quad[1] and quad[1] != _RDF_TYPE_N3: 

203 result.add(_n3_value(quad[2])) 

204 return result 

205 

206 

207class AgnosticEntity: 

208 def __init__(self, res:str, config:dict, include_related_objects:bool=False, include_merged_entities:bool=False, include_reverse_relations:bool=False): 

209 self.res = res 

210 self.include_related_objects = include_related_objects 

211 self.include_merged_entities = include_merged_entities 

212 self.include_reverse_relations = include_reverse_relations 

213 self.config = config 

214 

215 def get_history(self, include_prov_metadata: bool=False) -> tuple: 

216 if self.include_related_objects or self.include_merged_entities or self.include_reverse_relations: 

217 histories = {} 

218 self._collect_all_related_entities_histories(histories, include_prov_metadata) 

219 return self._get_merged_histories(histories, include_prov_metadata) 

220 else: 

221 entity_history = self._get_entity_current_state(include_prov_metadata) 

222 entity_history = self._get_old_graphs(entity_history) 

223 for uri, time_dict in entity_history[0].items(): 

224 for ts, quad_set in time_dict.items(): 

225 if quad_set is None: 

226 entity_history[0][uri][ts] = set() 

227 return tuple(entity_history) 

228 

229 def _collect_all_related_entities_histories( 

230 self, 

231 histories: dict, 

232 include_prov_metadata: bool 

233 ) -> None: 

234 main_entity = AgnosticEntity(self.res, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) 

235 entity_history = main_entity._get_entity_current_state(include_prov_metadata) 

236 entity_history = main_entity._get_old_graphs(entity_history) 

237 histories[self.res] = (entity_history[0], entity_history[1]) 

238 

239 processed_entities = {self.res} 

240 

241 if self.include_related_objects: 

242 self._collect_related_objects_recursively(self.res, processed_entities, histories, include_prov_metadata) 

243 

244 if self.include_merged_entities: 

245 self._collect_merged_entities_recursively(self.res, processed_entities, histories, include_prov_metadata) 

246 

247 if self.include_reverse_relations: 

248 self._collect_reverse_relations_recursively(self.res, processed_entities, histories, include_prov_metadata) 

249 

250 def _collect_related_objects_recursively( 

251 self, 

252 entity_uri: str, 

253 processed_entities: set[str], 

254 histories: dict, 

255 include_prov_metadata: bool, 

256 depth: int | None = None 

257 ) -> None: 

258 if depth is not None and depth <= 0: 

259 return 

260 

261 next_depth = None if depth is None else depth - 1 

262 

263 entity_graphs = histories[entity_uri][0][entity_uri] if entity_uri in histories else None 

264 if not entity_graphs: 

265 return 

266 

267 for obj_uri in _find_related_object_uris(entity_uri, entity_graphs): 

268 if obj_uri not in processed_entities: 

269 processed_entities.add(obj_uri) 

270 agnostic_entity = AgnosticEntity(obj_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) 

271 entity_history = agnostic_entity._get_entity_current_state(include_prov_metadata) 

272 entity_history = agnostic_entity._get_old_graphs(entity_history) 

273 histories[obj_uri] = (entity_history[0], entity_history[1]) 

274 self._collect_related_objects_recursively(obj_uri, processed_entities, histories, include_prov_metadata, next_depth) 

275 

276 def _collect_merged_entities_recursively( 

277 self, 

278 entity_uri: str, 

279 processed_entities: set[str], 

280 histories: dict, 

281 include_prov_metadata: bool, 

282 depth: int | None = None 

283 ) -> None: 

284 if depth is not None and depth <= 0: 

285 return 

286 

287 next_depth = None if depth is None else depth - 1 

288 

289 merged_entities = self._find_merged_entities(entity_uri) 

290 

291 for merged_entity_uri in merged_entities: 

292 if merged_entity_uri not in processed_entities: 

293 processed_entities.add(merged_entity_uri) 

294 agnostic_entity = AgnosticEntity(merged_entity_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) 

295 entity_history = agnostic_entity._get_entity_current_state(include_prov_metadata) 

296 entity_history = agnostic_entity._get_old_graphs(entity_history) 

297 histories[merged_entity_uri] = (entity_history[0], entity_history[1]) 

298 self._collect_merged_entities_recursively(merged_entity_uri, processed_entities, histories, include_prov_metadata, next_depth) 

299 

300 def _collect_reverse_relations_recursively( 

301 self, 

302 entity_uri: str, 

303 processed_entities: set[str], 

304 histories: dict, 

305 include_prov_metadata: bool, 

306 depth: int | None = None 

307 ) -> None: 

308 if depth is not None and depth <= 0: 

309 return 

310 

311 next_depth = None if depth is None else depth - 1 

312 

313 reverse_related_entities = self._find_reverse_related_entities(entity_uri) 

314 

315 for reverse_entity_uri in reverse_related_entities: 

316 if reverse_entity_uri not in processed_entities: 

317 processed_entities.add(reverse_entity_uri) 

318 agnostic_entity = AgnosticEntity(reverse_entity_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) 

319 entity_history = agnostic_entity._get_entity_current_state(include_prov_metadata) 

320 entity_history = agnostic_entity._get_old_graphs(entity_history) 

321 histories[reverse_entity_uri] = (entity_history[0], entity_history[1]) 

322 self._collect_reverse_relations_recursively(reverse_entity_uri, processed_entities, histories, include_prov_metadata, next_depth) 

323 

324 def _get_merged_histories( 

325 self, 

326 histories: dict, 

327 include_prov_metadata: bool 

328 ) -> tuple: 

329 entity_histories = {} 

330 metadata = {} 

331 for entity_uri, (entity_history_dict, entity_metadata) in histories.items(): 

332 entity_histories[entity_uri] = entity_history_dict[entity_uri] 

333 if include_prov_metadata and entity_metadata: 

334 metadata[entity_uri] = entity_metadata[entity_uri] 

335 

336 main_entity_times = sorted( 

337 entity_histories[self.res].keys(), key=lambda x: _parse_datetime(x) 

338 ) 

339 

340 merged_histories = {self.res: {}} 

341 

342 related_sorted_times = {} 

343 for entity_uri, entity_history in entity_histories.items(): 

344 if entity_uri == self.res: 

345 continue 

346 related_sorted_times[entity_uri] = sorted( 

347 ((t, _parse_datetime(t)) for t in entity_history), 

348 key=lambda x: x[1] 

349 ) 

350 

351 for timestamp in main_entity_times: 

352 merged_set = set(entity_histories[self.res][timestamp]) 

353 timestamp_dt = _parse_datetime(timestamp) 

354 

355 for entity_uri, sorted_times in related_sorted_times.items(): 

356 relevant_time = None 

357 for etime, etime_dt in sorted_times: 

358 if etime_dt <= timestamp_dt: 

359 relevant_time = etime 

360 else: 

361 break 

362 if relevant_time: 

363 merged_set.update(entity_histories[entity_uri][relevant_time]) 

364 

365 merged_histories[self.res][timestamp] = merged_set 

366 

367 return merged_histories, metadata 

368 

369 def get_state_at_time( 

370 self, 

371 time: tuple[str | None, str | None], 

372 include_prov_metadata: bool = False, 

373 ) -> tuple: 

374 if self.include_related_objects or self.include_merged_entities or self.include_reverse_relations: 

375 histories = {} 

376 self._collect_all_related_entities_states_at_time(histories, time, include_prov_metadata) 

377 return self._get_merged_histories_at_time(histories, include_prov_metadata) 

378 else: 

379 return self._get_entity_state_at_time(time, include_prov_metadata) 

380 

381 def get_delta( 

382 self, 

383 time_start: str, 

384 time_end: str, 

385 ) -> tuple[set[tuple[str, ...]], set[tuple[str, ...]]]: 

386 is_quadstore = self.config["provenance"]["is_quadstore"] 

387 graph_statement = f"GRAPH <{self.res}/prov/>" if is_quadstore else "" 

388 query_snapshots = f""" 

389 SELECT ?time ?updateQuery 

390 WHERE {{ 

391 {graph_statement} 

392 {{ 

393 ?snapshot <{ProvEntity.iri_specialization_of}> <{self.res}>; 

394 <{ProvEntity.iri_generated_at_time}> ?time. 

395 OPTIONAL {{ 

396 ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery. 

397 }} 

398 }} 

399 }} 

400 """ 

401 results = Sparql(query_snapshots, config=self.config).run_select_query() 

402 bindings = results['results']['bindings'] 

403 if not bindings: 

404 return set(), set() 

405 start_dt = _parse_datetime(time_start) 

406 end_dt = _parse_datetime(time_end) 

407 parsed = [(b, _parse_datetime(b['time']['value'])) for b in bindings] 

408 first_snapshot_dt = min(dt for _, dt in parsed) 

409 if first_snapshot_dt > start_dt: 

410 entity_graphs, _, _ = self._get_entity_state_at_time( 

411 (time_end, time_end), False 

412 ) 

413 if not entity_graphs: 

414 return set(), set() 

415 state_at_end = next(iter(entity_graphs.values())) 

416 return state_at_end, set() 

417 relevant = sorted( 

418 ((b, dt) for b, dt in parsed 

419 if start_dt < dt <= end_dt 

420 and 'updateQuery' in b and 'value' in b['updateQuery']), 

421 key=lambda x: x[1], 

422 ) 

423 return _compose_update_queries([b['updateQuery']['value'] for b, _ in relevant]) 

424 

425 def _collect_all_related_entities_states_at_time( 

426 self, 

427 histories: dict, 

428 time: tuple[str | None, str | None], 

429 include_prov_metadata: bool 

430 ) -> None: 

431 main_entity = AgnosticEntity(self.res, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) 

432 entity_graphs, entity_snapshots, other_snapshots_metadata = main_entity._get_entity_state_at_time(time, include_prov_metadata) 

433 histories[self.res] = (entity_graphs, entity_snapshots, other_snapshots_metadata) 

434 

435 processed_entities = {self.res} 

436 

437 if self.include_related_objects: 

438 self._collect_related_objects_states_at_time(self.res, processed_entities, histories, time, include_prov_metadata) 

439 

440 if self.include_merged_entities: 

441 self._collect_merged_entities_states_at_time(self.res, processed_entities, histories, time, include_prov_metadata) 

442 

443 if self.include_reverse_relations: 

444 self._collect_reverse_relations_states_at_time(self.res, processed_entities, histories, time, include_prov_metadata) 

445 

446 def _collect_related_objects_states_at_time( 

447 self, 

448 entity_uri: str, 

449 processed_entities: set[str], 

450 histories: dict, 

451 time: tuple[str | None, str | None], 

452 include_prov_metadata: bool, 

453 depth: int | None = None 

454 ) -> None: 

455 if depth is not None and depth <= 0: 

456 return 

457 

458 next_depth = None if depth is None else depth - 1 

459 

460 entity_graphs = histories[entity_uri][0] if entity_uri in histories else None 

461 if not entity_graphs: 

462 return 

463 

464 for obj_uri in _find_related_object_uris(entity_uri, entity_graphs): 

465 if obj_uri not in processed_entities: 

466 processed_entities.add(obj_uri) 

467 agnostic_entity = AgnosticEntity(obj_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) 

468 entity_graphs_new, entity_snapshots, other_snapshots_metadata = agnostic_entity._get_entity_state_at_time(time, include_prov_metadata) 

469 histories[obj_uri] = (entity_graphs_new, entity_snapshots, other_snapshots_metadata) 

470 self._collect_related_objects_states_at_time(obj_uri, processed_entities, histories, time, include_prov_metadata, next_depth) 

471 

472 def _collect_merged_entities_states_at_time( 

473 self, 

474 entity_uri: str, 

475 processed_entities: set[str], 

476 histories: dict, 

477 time: tuple[str | None, str | None], 

478 include_prov_metadata: bool, 

479 depth: int | None = None 

480 ) -> None: 

481 if depth is not None and depth <= 0: 

482 return 

483 

484 next_depth = None if depth is None else depth - 1 

485 

486 merged_entities = self._find_merged_entities(entity_uri) 

487 

488 for merged_entity_uri in merged_entities: 

489 if merged_entity_uri not in processed_entities: 

490 processed_entities.add(merged_entity_uri) 

491 agnostic_entity = AgnosticEntity(merged_entity_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) 

492 entity_graphs, entity_snapshots, other_snapshots_metadata = agnostic_entity._get_entity_state_at_time(time, include_prov_metadata) 

493 histories[merged_entity_uri] = (entity_graphs, entity_snapshots, other_snapshots_metadata) 

494 self._collect_merged_entities_states_at_time(merged_entity_uri, processed_entities, histories, time, include_prov_metadata, next_depth) 

495 

496 def _collect_reverse_relations_states_at_time( 

497 self, 

498 entity_uri: str, 

499 processed_entities: set[str], 

500 histories: dict, 

501 time: tuple[str | None, str | None], 

502 include_prov_metadata: bool, 

503 depth: int | None = None 

504 ) -> None: 

505 if depth is not None and depth <= 0: 

506 return 

507 

508 next_depth = None if depth is None else depth - 1 

509 

510 reverse_related_entities = self._find_reverse_related_entities(entity_uri) 

511 

512 for reverse_entity_uri in reverse_related_entities: 

513 if reverse_entity_uri not in processed_entities: 

514 processed_entities.add(reverse_entity_uri) 

515 agnostic_entity = AgnosticEntity(reverse_entity_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False) 

516 entity_graphs, entity_snapshots, other_snapshots_metadata = agnostic_entity._get_entity_state_at_time(time, include_prov_metadata) 

517 histories[reverse_entity_uri] = (entity_graphs, entity_snapshots, other_snapshots_metadata) 

518 self._collect_reverse_relations_states_at_time(reverse_entity_uri, processed_entities, histories, time, include_prov_metadata, next_depth) 

519 

520 def _get_merged_histories_at_time( 

521 self, 

522 histories: dict, 

523 include_prov_metadata: bool 

524 ) -> tuple: 

525 entity_histories = {} 

526 entity_snapshots_metadata = {} 

527 other_snapshots_metadata = {} if include_prov_metadata else None 

528 

529 for entity_uri, (entity_graphs, entity_snapshots, other_snapshots) in histories.items(): 

530 entity_histories[entity_uri] = entity_graphs 

531 entity_snapshots_metadata[entity_uri] = entity_snapshots 

532 if include_prov_metadata and other_snapshots and other_snapshots_metadata is not None: 

533 other_snapshots_metadata[entity_uri] = other_snapshots 

534 

535 main_entity_times = sorted( 

536 set(entity_histories[self.res].keys()), key=lambda x: _parse_datetime(x) 

537 ) 

538 

539 merged_histories = {self.res: {}} 

540 

541 related_sorted_times = {} 

542 for entity_uri, graphs_at_times in entity_histories.items(): 

543 if entity_uri == self.res: 

544 continue 

545 related_sorted_times[entity_uri] = sorted( 

546 ((t, _parse_datetime(t)) for t in graphs_at_times), 

547 key=lambda x: x[1] 

548 ) 

549 

550 for timestamp in main_entity_times: 

551 merged_set = set(entity_histories[self.res][timestamp]) 

552 timestamp_dt = _parse_datetime(timestamp) 

553 

554 for entity_uri, sorted_times in related_sorted_times.items(): 

555 graphs_at_times = entity_histories[entity_uri] 

556 if timestamp in graphs_at_times: 

557 related_quads = graphs_at_times[timestamp] 

558 else: 

559 relevant_time = None 

560 for rt, rt_dt in sorted_times: 

561 if rt_dt <= timestamp_dt: 

562 relevant_time = rt 

563 else: 

564 break 

565 if relevant_time: 

566 related_quads = graphs_at_times[relevant_time] 

567 else: 

568 continue 

569 merged_set.update(related_quads) 

570 

571 merged_histories[self.res][timestamp] = merged_set 

572 

573 return merged_histories, entity_snapshots_metadata, other_snapshots_metadata 

574 

575 def _get_entity_state_at_time( 

576 self, 

577 time: tuple[str | None, str | None], 

578 include_prov_metadata: bool 

579 ) -> tuple: 

580 other_snapshots_metadata = {} 

581 is_quadstore = self.config["provenance"]["is_quadstore"] 

582 graph_statement = f"GRAPH <{self.res}/prov/>" if is_quadstore else "" 

583 if include_prov_metadata: 

584 query_snapshots = f""" 

585 SELECT ?snapshot ?time ?responsibleAgent ?updateQuery ?primarySource ?description ?invalidatedAtTime 

586 WHERE {{ 

587 {graph_statement} 

588 {{ 

589 ?snapshot <{ProvEntity.iri_specialization_of}> <{self.res}>; 

590 <{ProvEntity.iri_generated_at_time}> ?time; 

591 <{ProvEntity.iri_was_attributed_to}> ?responsibleAgent. 

592 OPTIONAL {{ 

593 ?snapshot <{ProvEntity.iri_invalidated_at_time}> ?invalidatedAtTime. 

594 }} 

595 OPTIONAL {{ 

596 ?snapshot <{ProvEntity.iri_description}> ?description. 

597 }} 

598 OPTIONAL {{ 

599 ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery. 

600 }} 

601 OPTIONAL {{ 

602 ?snapshot <{ProvEntity.iri_had_primary_source}> ?primarySource. 

603 }} 

604 }} 

605 }} 

606 """ 

607 else: 

608 query_snapshots = f""" 

609 SELECT ?snapshot ?time ?updateQuery 

610 WHERE {{ 

611 {graph_statement} 

612 {{ 

613 ?snapshot <{ProvEntity.iri_specialization_of}> <{self.res}>; 

614 <{ProvEntity.iri_generated_at_time}> ?time. 

615 OPTIONAL {{ 

616 ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery. 

617 }} 

618 }} 

619 }} 

620 """ 

621 results = Sparql(query_snapshots, config=self.config).run_select_query() 

622 bindings = results['results']['bindings'] 

623 if not bindings: 

624 return {}, {}, other_snapshots_metadata 

625 sorted_results = sorted(bindings, key=lambda x: _parse_datetime(x['time']['value']), reverse=True) 

626 relevant_results = _filter_timestamps_by_interval(time, sorted_results, time_index='time') 

627 if include_prov_metadata: 

628 relevant_snapshot_uris = {relevant_result['snapshot']['value'] for relevant_result in relevant_results} 

629 other_snapshots = [snapshot for snapshot in bindings if snapshot['snapshot']['value'] not in relevant_snapshot_uris] 

630 for other_snapshot in other_snapshots: 

631 snapshot_uri = other_snapshot['snapshot']['value'] 

632 other_snapshots_metadata[snapshot_uri] = { 

633 "generatedAtTime": other_snapshot['time']['value'], 

634 "invalidatedAtTime": other_snapshot.get('invalidatedAtTime', {}).get('value'), 

635 "wasAttributedTo": other_snapshot['responsibleAgent']['value'], 

636 "hasUpdateQuery": other_snapshot.get('updateQuery', {}).get('value'), 

637 "hadPrimarySource": other_snapshot.get('primarySource', {}).get('value'), 

638 "description": other_snapshot.get('description', {}).get('value') 

639 } 

640 if not relevant_results: 

641 interval_start = _parse_datetime(time[0]) if time[0] else None 

642 if interval_start: 

643 earlier_snapshots = [r for r in bindings if _parse_datetime(r['time']['value']) <= interval_start] 

644 if earlier_snapshots: 

645 latest_snapshot = max(earlier_snapshots, key=lambda x: _parse_datetime(x['time']['value'])) 

646 relevant_results = [latest_snapshot] 

647 else: 

648 return {}, {}, other_snapshots_metadata 

649 else: 

650 return {}, {}, other_snapshots_metadata 

651 entity_snapshots = {} 

652 entity_graphs: dict[str, set[tuple[str, ...]]] = {} 

653 entity_quads = self._query_dataset(self.res) 

654 sorted_parsed = [(r, _parse_datetime(r['time']['value'])) for r in sorted_results] 

655 last_idx = len(relevant_results) - 1 

656 for i, relevant_result in enumerate(relevant_results): 

657 relevant_result_time = relevant_result['time']['value'] 

658 relevant_result_dt = _parse_datetime(relevant_result_time) 

659 update_parts = [ 

660 r['updateQuery']['value'] 

661 for r, r_dt in sorted_parsed 

662 if 'updateQuery' in r and 'value' in r['updateQuery'] and r_dt > relevant_result_dt 

663 ] 

664 entity_present_graph = entity_quads if i == last_idx else set(entity_quads) 

665 if update_parts: 

666 self._manage_update_queries(entity_present_graph, ";".join(update_parts)) 

667 timestamp_key = convert_to_datetime(relevant_result_time, stringify=True) 

668 entity_graphs[timestamp_key] = entity_present_graph # type: ignore[index] 

669 if include_prov_metadata: 

670 snapshot_uri = relevant_result['snapshot']['value'] 

671 entity_snapshots[snapshot_uri] = { 

672 "generatedAtTime": relevant_result_time, 

673 "invalidatedAtTime": relevant_result.get('invalidatedAtTime', {}).get('value'), 

674 "wasAttributedTo": relevant_result['responsibleAgent']['value'], 

675 "hasUpdateQuery": relevant_result.get('updateQuery', {}).get('value'), 

676 "hadPrimarySource": relevant_result.get('primarySource', {}).get('value'), 

677 "description": relevant_result.get('description', {}).get('value') 

678 } 

679 return entity_graphs, entity_snapshots, other_snapshots_metadata 

680 

681 def _include_prov_metadata(self, triples_generated_at_time: list, current_state: set[tuple[str, ...]]) -> dict: 

682 res_n3 = f"<{self.res}>" 

683 entity_n3 = f"<{ProvEntity.iri_entity}>" 

684 for quad in current_state: 

685 if quad[0] == res_n3 and quad[1] == _RDF_TYPE_N3 and quad[2] == entity_n3: 

686 return {} 

687 prov_properties = { 

688 f"<{ProvEntity.iri_invalidated_at_time}>": "invalidatedAtTime", 

689 f"<{ProvEntity.iri_was_attributed_to}>": "wasAttributedTo", 

690 f"<{ProvEntity.iri_had_primary_source}>": "hadPrimarySource", 

691 f"<{ProvEntity.iri_description}>": "description", 

692 f"<{ProvEntity.iri_has_update_query}>": "hasUpdateQuery", 

693 f"<{ProvEntity.iri_was_derived_from}>": "wasDerivedFrom" 

694 } 

695 prov_metadata: dict = { 

696 self.res: {} 

697 } 

698 for triple in triples_generated_at_time: 

699 time = convert_to_datetime(_n3_value(triple[2]), stringify=True) 

700 snapshot_uri_str = _n3_value(triple[0]) 

701 prov_metadata[self.res][snapshot_uri_str] = { 

702 "generatedAtTime": time, 

703 "invalidatedAtTime": None, 

704 "wasAttributedTo": None, 

705 "hadPrimarySource": None, 

706 "description": None, 

707 "hasUpdateQuery": None, 

708 "wasDerivedFrom": [] 

709 } 

710 prov_prop_n3_set = set(prov_properties) 

711 index: dict[str, dict[str, list[str]]] = {} 

712 for quad in current_state: 

713 if quad[1] in prov_prop_n3_set: 

714 index.setdefault(quad[0], {}).setdefault(quad[1], []).append(_n3_value(quad[2])) 

715 for metadata in dict(prov_metadata).values(): 

716 for se_uri_str, snapshot_data in metadata.items(): 

717 se_n3 = f"<{se_uri_str}>" 

718 se_props = index.get(se_n3, {}) 

719 for prov_prop_n3, abbr in prov_properties.items(): 

720 for value in se_props.get(prov_prop_n3, ()): 

721 if abbr == "wasDerivedFrom": 

722 snapshot_data[abbr].append(value) 

723 else: 

724 snapshot_data[abbr] = value 

725 if isinstance(snapshot_data.get("wasDerivedFrom"), list): 

726 snapshot_data["wasDerivedFrom"] = sorted(snapshot_data["wasDerivedFrom"]) 

727 

728 return prov_metadata 

729 

730 def _get_entity_current_state(self, include_prov_metadata: bool = False) -> list: 

731 entity_current_state: list = [{self.res: {}}] 

732 prov_quads = self._query_provenance(include_prov_metadata) 

733 if len(prov_quads) == 0: 

734 entity_current_state.append({}) 

735 return entity_current_state 

736 dataset_quads = self._query_dataset(self.res) 

737 gen_at_time_n3 = f"<{ProvEntity.iri_generated_at_time}>" 

738 triples_generated_at_time = [ 

739 quad for quad in prov_quads if quad[1] == gen_at_time_n3 

740 ] 

741 most_recent_time = None 

742 most_recent_time_str: str | None = None 

743 for quad in triples_generated_at_time: 

744 snapshot_time_str = _n3_value(quad[2]) 

745 snapshot_date_time = _parse_datetime(snapshot_time_str) 

746 if most_recent_time: 

747 if snapshot_date_time > most_recent_time: 

748 most_recent_time = snapshot_date_time 

749 most_recent_time_str = snapshot_time_str 

750 else: 

751 most_recent_time = snapshot_date_time 

752 most_recent_time_str = snapshot_time_str 

753 entity_current_state[0][self.res][snapshot_time_str] = None 

754 entity_current_state[0][self.res][most_recent_time_str] = dataset_quads 

755 if include_prov_metadata: 

756 prov_metadata = self._include_prov_metadata( 

757 triples_generated_at_time, prov_quads 

758 ) 

759 entity_current_state.append(prov_metadata) 

760 else: 

761 entity_current_state.append(None) 

762 entity_current_state.append(prov_quads) 

763 return entity_current_state 

764 

765 def _get_old_graphs(self, entity_current_state: list) -> list: 

766 prov_quads = entity_current_state.pop(2) if len(entity_current_state) > 2 else set() 

767 snapshot_update_queries = _extract_snapshot_update_queries(prov_quads) 

768 ordered_data: list[tuple[str, set[tuple[str, ...]]]] = sorted( 

769 entity_current_state[0][self.res].items(), 

770 key=lambda x: _parse_datetime(str(x[0])), 

771 reverse=True 

772 ) 

773 if not ordered_data: 

774 return entity_current_state 

775 for index, date_graph in enumerate(ordered_data): 

776 if index > 0: 

777 next_snapshot = ordered_data[index-1][0] 

778 previous_graph = set(entity_current_state[0][self.res][next_snapshot]) 

779 update_query = snapshot_update_queries.get(str(next_snapshot)) 

780 if update_query is None: 

781 entity_current_state[0][self.res][date_graph[0]] = previous_graph 

782 else: 

783 self._manage_update_queries(previous_graph, update_query) 

784 entity_current_state[0][self.res][date_graph[0]] = previous_graph 

785 for time in list(entity_current_state[0][self.res]): 

786 quad_set = entity_current_state[0][self.res].pop(time) 

787 time_str = str(convert_to_datetime(str(time), stringify=True)) 

788 entity_current_state[0][self.res][time_str] = quad_set 

789 return entity_current_state 

790 

791 def iter_versions(self): 

792 prov_quads = self._query_provenance(include_prov_metadata=False) 

793 if len(prov_quads) == 0: 

794 return 

795 dataset_quads = self._query_dataset(self.res) 

796 working: set[tuple[str, ...]] = set(dataset_quads) 

797 snapshots = _extract_snapshot_update_queries(prov_quads) 

798 ordered = sorted(snapshots.items(), key=lambda x: _parse_datetime(x[0]), reverse=True) 

799 for i, (time_str, _update_query) in enumerate(ordered): 

800 if i > 0: 

801 prev_update = ordered[i - 1][1] 

802 if prev_update is not None: 

803 self._manage_update_queries(working, prev_update) 

804 normalized = str(convert_to_datetime(time_str, stringify=True)) 

805 yield normalized, set(working) 

806 

807 @classmethod 

808 def _manage_update_queries(cls, graph: set, update_query: str) -> None: 

809 operations = _fast_parse_update(update_query) 

810 for operation_type, quads in operations: 

811 if operation_type == 'DeleteData': 

812 for quad in quads: 

813 graph.add(quad) 

814 elif operation_type == 'InsertData': 

815 for quad in quads: 

816 graph.discard(quad) 

817 

818 def _query_dataset(self, entity_uri: str | None = None) -> set[tuple[str, ...]]: 

819 entity_uri = self.res if entity_uri is None else entity_uri 

820 

821 is_quadstore = self.config['dataset']['is_quadstore'] 

822 

823 if is_quadstore: 

824 query_dataset = f""" 

825 SELECT ?s ?p ?o ?g 

826 WHERE {{ 

827 GRAPH ?g {{ 

828 VALUES ?s {{<{entity_uri}>}} 

829 ?s ?p ?o 

830 }} 

831 }} 

832 """ 

833 else: 

834 query_dataset = f""" 

835 SELECT ?s ?p ?o 

836 WHERE {{ 

837 VALUES ?s {{<{entity_uri}>}} 

838 ?s ?p ?o 

839 }} 

840 """ 

841 

842 return Sparql(query_dataset, config=self.config).run_select_to_quad_set() 

843 

844 def _query_provenance(self, include_prov_metadata:bool=False) -> set[tuple[str, ...]]: 

845 if include_prov_metadata: 

846 query_provenance = f""" 

847 SELECT ?s ?p ?o WHERE {{ 

848 ?s <{ProvEntity.iri_specialization_of}> <{self.res}>; 

849 <{ProvEntity.iri_was_attributed_to}> ?_agent; 

850 <{ProvEntity.iri_generated_at_time}> ?_t; 

851 <{ProvEntity.iri_description}> ?_desc. 

852 ?s ?p ?o. 

853 VALUES ?p {{ 

854 <{ProvEntity.iri_generated_at_time}> 

855 <{ProvEntity.iri_was_attributed_to}> 

856 <{ProvEntity.iri_had_primary_source}> 

857 <{ProvEntity.iri_description}> 

858 <{ProvEntity.iri_has_update_query}> 

859 <{ProvEntity.iri_invalidated_at_time}> 

860 <{ProvEntity.iri_was_derived_from}> 

861 <{ProvEntity.iri_specialization_of}> 

862 }} 

863 }} 

864 """ 

865 else: 

866 query_provenance = f""" 

867 SELECT ?s ?p ?o WHERE {{ 

868 ?s <{ProvEntity.iri_specialization_of}> <{self.res}>; 

869 <{ProvEntity.iri_generated_at_time}> ?_t. 

870 ?s ?p ?o. 

871 VALUES ?p {{ 

872 <{ProvEntity.iri_generated_at_time}> 

873 <{ProvEntity.iri_has_update_query}> 

874 <{ProvEntity.iri_was_derived_from}> 

875 <{ProvEntity.iri_specialization_of}> 

876 }} 

877 }} 

878 """ 

879 return Sparql(query_provenance, config=self.config).run_select_to_quad_set() 

880 

881 def _find_merged_entities(self, entity_uri: str) -> set[str]: 

882 merged_entity_uris = set() 

883 query_simple = f""" 

884 SELECT ?merged_entity_uri 

885 WHERE {{ 

886 ?snapshot <{ProvEntity.iri_specialization_of}> <{entity_uri}> . 

887 ?snapshot <{ProvEntity.iri_was_derived_from}> ?derived_snapshot . 

888 ?derived_snapshot <{ProvEntity.iri_specialization_of}> ?merged_entity_uri . 

889 FILTER (?merged_entity_uri != <{entity_uri}>) 

890 }} 

891 """ 

892 try: 

893 results = Sparql(query_simple, config=self.config).run_select_query() 

894 bindings = results.get('results', {}).get('bindings', []) 

895 for binding in bindings: 

896 if 'merged_entity_uri' in binding and 'value' in binding['merged_entity_uri']: 

897 merged_entity_uris.add(binding['merged_entity_uri']['value']) 

898 except Exception as e: 

899 print(f"Error querying for merged entities for {entity_uri}: {e}") 

900 

901 return merged_entity_uris 

902 

903 def _find_reverse_related_entities(self, entity_uri: str) -> set[str]: 

904 reverse_related_entity_uris = set() 

905 

906 is_quadstore = self.config['dataset']['is_quadstore'] 

907 

908 if is_quadstore: 

909 query = f""" 

910 SELECT ?subject 

911 WHERE {{ 

912 GRAPH ?g {{ 

913 ?subject ?predicate <{entity_uri}> . 

914 FILTER(?predicate != <{_RDF_TYPE}> && !strstarts(str(?predicate), "{ProvEntity.PROV}")) 

915 }} 

916 }} 

917 """ 

918 else: 

919 query = f""" 

920 SELECT ?subject 

921 WHERE {{ 

922 ?subject ?predicate <{entity_uri}> . 

923 FILTER(?predicate != <{_RDF_TYPE}> && !strstarts(str(?predicate), "{ProvEntity.PROV}")) 

924 }} 

925 """ 

926 

927 try: 

928 results = Sparql(query, config=self.config).run_select_query() 

929 bindings = results.get('results', {}).get('bindings', []) 

930 for binding in bindings: 

931 if 'subject' in binding and 'value' in binding['subject']: 

932 subject_uri = binding['subject']['value'] 

933 if subject_uri != entity_uri: 

934 reverse_related_entity_uris.add(subject_uri) 

935 except Exception as e: 

936 print(f"Error querying for reverse related entities for {entity_uri}: {e}") 

937 

938 return reverse_related_entity_uris 

939 

940def _filter_timestamps_by_interval(interval: tuple[str | None, str | None] | None, iterator: list, time_index: str | None = None) -> list: 

941 if interval: 

942 after_time = _parse_datetime(interval[0]) if interval[0] else None 

943 before_time = _parse_datetime(interval[1]) if interval[1] else None 

944 relevant_timestamps = [] 

945 for timestamp in iterator: 

946 if time_index is not None and time_index in timestamp: 

947 time_binding = timestamp[time_index] 

948 if 'value' in time_binding: 

949 time_str = time_binding['value'] 

950 time = _parse_datetime(time_str) 

951 else: 

952 continue 

953 else: 

954 continue 

955 if after_time and before_time: 

956 if after_time <= time <= before_time: 

957 relevant_timestamps.append(timestamp) 

958 elif after_time and not before_time: 

959 if time >= after_time: 

960 relevant_timestamps.append(timestamp) 

961 elif before_time and not after_time: 

962 if time <= before_time: 

963 relevant_timestamps.append(timestamp) 

964 else: 

965 relevant_timestamps.append(timestamp) 

966 else: 

967 relevant_timestamps = iterator.copy() 

968 return relevant_timestamps