Coverage for src / time_agnostic_library / agnostic_entity.py: 100%
563 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-03-21 11:54 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-03-21 11:54 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7import re
8from datetime import datetime
9from functools import lru_cache
11from time_agnostic_library.prov_entity import ProvEntity
12from time_agnostic_library.sparql import Sparql, _n3_value
13from time_agnostic_library.support import convert_to_datetime
15_OPERATION_RE = re.compile(r'(DELETE|INSERT)\s+DATA', re.IGNORECASE)
16_GRAPH_BLOCK_RE = re.compile(r'GRAPH\s*<([^>]+)>\s*\{', re.IGNORECASE)
18_RDF_TERM_RE = re.compile(
19 r'<([^>]+)>'
20 r'|"((?:[^"\\]|\\.)*)"\^\^<([^>]+)>'
21 r'|"((?:[^"\\]|\\.)*)"@([a-zA-Z][\w-]*)'
22 r'|"((?:[^"\\]|\\.)*)"'
23 r"|'((?:[^'\\]|\\.)*)'"
24 r'|(_:\S+)',
25 re.DOTALL,
26)
28_ESCAPE_CHAR_RE = re.compile(r'\\(.)')
29_ESCAPE_CHAR_MAP = {'n': '\n', 'r': '\r', 't': '\t'}
31_RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
34def _unescape_literal(s: str) -> str:
35 if '\\' not in s:
36 return s
37 return _ESCAPE_CHAR_RE.sub(
38 lambda m: _ESCAPE_CHAR_MAP.get(m.group(1), m.group(1)), s
39 )
42def _normalize_literal(raw: str) -> str:
43 unescaped = _unescape_literal(raw)
44 return unescaped.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r')
47def _regex_match_to_n3(match: re.Match) -> str:
48 uri = match.group(1)
49 if uri is not None:
50 return f"<{uri}>"
52 typed_value = match.group(2)
53 if typed_value is not None:
54 return f'"{_normalize_literal(typed_value)}"^^<{match.group(3)}>'
56 lang_value = match.group(4)
57 if lang_value is not None:
58 return f'"{_normalize_literal(lang_value)}"@{match.group(5)}'
60 double_quoted = match.group(6)
61 if double_quoted is not None:
62 return f'"{_normalize_literal(double_quoted)}"'
64 single_quoted = match.group(7)
65 if single_quoted is not None:
66 return f'"{_normalize_literal(single_quoted)}"'
68 return match.group(8)
71_BRACE_OR_QUOTE_RE = re.compile(r'[{}\'"]')
74def _find_matching_close_brace(text: str, start: int) -> int:
75 pos = start
76 length = len(text)
77 depth = 1
78 while pos < length:
79 m = _BRACE_OR_QUOTE_RE.search(text, pos)
80 if m is None:
81 return length
82 pos = m.start()
83 char = text[pos]
84 if char == '{':
85 depth += 1
86 pos += 1
87 elif char == '}':
88 depth -= 1
89 if depth == 0:
90 return pos
91 pos += 1
92 else:
93 quote_char = char
94 pos += 1
95 while pos < length:
96 q = text.find(quote_char, pos)
97 if q == -1:
98 pos = length
99 break
100 num_backslashes = 0
101 check = q - 1
102 while check >= start and text[check] == '\\':
103 num_backslashes += 1
104 check -= 1
105 if num_backslashes % 2 == 0:
106 pos = q + 1
107 break
108 pos = q + 1
109 return length
112def _fast_parse_update(update_query: str) -> list[tuple[str, list[tuple[str, str, str, str]]]]:
113 operations: list[tuple[str, list[tuple[str, str, str, str]]]] = []
114 operation_matches = list(_OPERATION_RE.finditer(update_query))
115 query_len = len(update_query)
117 for i, operation_match in enumerate(operation_matches):
118 operation_type = 'DeleteData' if operation_match.group(1).upper() == 'DELETE' else 'InsertData'
120 op_start = operation_match.end()
121 op_end = operation_matches[i + 1].start() if i + 1 < len(operation_matches) else query_len
122 operation_body = update_query[op_start:op_end]
124 quads: list[tuple[str, str, str, str]] = []
125 for graph_match in _GRAPH_BLOCK_RE.finditer(operation_body):
126 graph_n3 = f"<{graph_match.group(1)}>"
127 triples_start = graph_match.end()
128 triples_end = _find_matching_close_brace(operation_body, triples_start)
129 triples_text = operation_body[triples_start:triples_end]
131 terms: list[str] = []
132 for m in _RDF_TERM_RE.finditer(triples_text):
133 terms.append(_regex_match_to_n3(m))
134 if len(terms) == 3:
135 quads.append((terms[0], terms[1], terms[2], graph_n3))
136 terms.clear()
138 operations.append((operation_type, quads))
140 return operations
143def _compose_update_queries(
144 update_queries: list[str],
145) -> tuple[set[tuple[str, ...]], set[tuple[str, ...]]]:
146 additions: set[tuple[str, ...]] = set()
147 deletions: set[tuple[str, ...]] = set()
148 for uq in update_queries:
149 for op_type, quads in _fast_parse_update(uq):
150 if op_type == 'DeleteData':
151 for quad in quads:
152 if quad in additions:
153 additions.discard(quad)
154 else:
155 deletions.add(quad)
156 elif op_type == 'InsertData':
157 for quad in quads:
158 if quad in deletions:
159 deletions.discard(quad)
160 else:
161 additions.add(quad)
162 return additions, deletions
165CONFIG_PATH = "./config.json"
168@lru_cache(maxsize=4096)
169def _parse_datetime(time_string: str) -> datetime:
170 result = convert_to_datetime(time_string)
171 assert isinstance(result, datetime)
172 return result
175_GEN_AT_TIME_N3 = f"<{ProvEntity.iri_generated_at_time}>"
176_HAS_UQ_N3 = f"<{ProvEntity.iri_has_update_query}>"
179def _extract_snapshot_update_queries(quads: set[tuple[str, ...]]) -> dict[str, str | None]:
180 by_subject: dict[str, dict[str, str]] = {}
181 for quad in quads:
182 if quad[1] in (_GEN_AT_TIME_N3, _HAS_UQ_N3):
183 by_subject.setdefault(quad[0], {})[quad[1]] = _n3_value(quad[2])
184 result: dict[str, str | None] = {}
185 for props in by_subject.values():
186 if _GEN_AT_TIME_N3 in props:
187 result[props[_GEN_AT_TIME_N3]] = props.get(_HAS_UQ_N3)
188 return result
191_PROV_PREFIX = ProvEntity.PROV
192_RDF_TYPE_N3 = f"<{_RDF_TYPE}>"
195def _find_related_object_uris(entity_uri: str, graphs: dict) -> set[str]:
196 entity_n3 = f"<{entity_uri}>"
197 result = set()
198 for quad_set in graphs.values():
199 if quad_set is None:
200 continue
201 for quad in quad_set:
202 if quad[0] == entity_n3 and quad[2].startswith('<') and _PROV_PREFIX not in quad[1] and quad[1] != _RDF_TYPE_N3:
203 result.add(_n3_value(quad[2]))
204 return result
207class AgnosticEntity:
208 def __init__(self, res:str, config:dict, include_related_objects:bool=False, include_merged_entities:bool=False, include_reverse_relations:bool=False):
209 self.res = res
210 self.include_related_objects = include_related_objects
211 self.include_merged_entities = include_merged_entities
212 self.include_reverse_relations = include_reverse_relations
213 self.config = config
215 def get_history(self, include_prov_metadata: bool=False) -> tuple:
216 if self.include_related_objects or self.include_merged_entities or self.include_reverse_relations:
217 histories = {}
218 self._collect_all_related_entities_histories(histories, include_prov_metadata)
219 return self._get_merged_histories(histories, include_prov_metadata)
220 else:
221 entity_history = self._get_entity_current_state(include_prov_metadata)
222 entity_history = self._get_old_graphs(entity_history)
223 for uri, time_dict in entity_history[0].items():
224 for ts, quad_set in time_dict.items():
225 if quad_set is None:
226 entity_history[0][uri][ts] = set()
227 return tuple(entity_history)
229 def _collect_all_related_entities_histories(
230 self,
231 histories: dict,
232 include_prov_metadata: bool
233 ) -> None:
234 main_entity = AgnosticEntity(self.res, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
235 entity_history = main_entity._get_entity_current_state(include_prov_metadata)
236 entity_history = main_entity._get_old_graphs(entity_history)
237 histories[self.res] = (entity_history[0], entity_history[1])
239 processed_entities = {self.res}
241 if self.include_related_objects:
242 self._collect_related_objects_recursively(self.res, processed_entities, histories, include_prov_metadata)
244 if self.include_merged_entities:
245 self._collect_merged_entities_recursively(self.res, processed_entities, histories, include_prov_metadata)
247 if self.include_reverse_relations:
248 self._collect_reverse_relations_recursively(self.res, processed_entities, histories, include_prov_metadata)
250 def _collect_related_objects_recursively(
251 self,
252 entity_uri: str,
253 processed_entities: set[str],
254 histories: dict,
255 include_prov_metadata: bool,
256 depth: int | None = None
257 ) -> None:
258 if depth is not None and depth <= 0:
259 return
261 next_depth = None if depth is None else depth - 1
263 entity_graphs = histories[entity_uri][0][entity_uri] if entity_uri in histories else None
264 if not entity_graphs:
265 return
267 for obj_uri in _find_related_object_uris(entity_uri, entity_graphs):
268 if obj_uri not in processed_entities:
269 processed_entities.add(obj_uri)
270 agnostic_entity = AgnosticEntity(obj_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
271 entity_history = agnostic_entity._get_entity_current_state(include_prov_metadata)
272 entity_history = agnostic_entity._get_old_graphs(entity_history)
273 histories[obj_uri] = (entity_history[0], entity_history[1])
274 self._collect_related_objects_recursively(obj_uri, processed_entities, histories, include_prov_metadata, next_depth)
276 def _collect_merged_entities_recursively(
277 self,
278 entity_uri: str,
279 processed_entities: set[str],
280 histories: dict,
281 include_prov_metadata: bool,
282 depth: int | None = None
283 ) -> None:
284 if depth is not None and depth <= 0:
285 return
287 next_depth = None if depth is None else depth - 1
289 merged_entities = self._find_merged_entities(entity_uri)
291 for merged_entity_uri in merged_entities:
292 if merged_entity_uri not in processed_entities:
293 processed_entities.add(merged_entity_uri)
294 agnostic_entity = AgnosticEntity(merged_entity_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
295 entity_history = agnostic_entity._get_entity_current_state(include_prov_metadata)
296 entity_history = agnostic_entity._get_old_graphs(entity_history)
297 histories[merged_entity_uri] = (entity_history[0], entity_history[1])
298 self._collect_merged_entities_recursively(merged_entity_uri, processed_entities, histories, include_prov_metadata, next_depth)
300 def _collect_reverse_relations_recursively(
301 self,
302 entity_uri: str,
303 processed_entities: set[str],
304 histories: dict,
305 include_prov_metadata: bool,
306 depth: int | None = None
307 ) -> None:
308 if depth is not None and depth <= 0:
309 return
311 next_depth = None if depth is None else depth - 1
313 reverse_related_entities = self._find_reverse_related_entities(entity_uri)
315 for reverse_entity_uri in reverse_related_entities:
316 if reverse_entity_uri not in processed_entities:
317 processed_entities.add(reverse_entity_uri)
318 agnostic_entity = AgnosticEntity(reverse_entity_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
319 entity_history = agnostic_entity._get_entity_current_state(include_prov_metadata)
320 entity_history = agnostic_entity._get_old_graphs(entity_history)
321 histories[reverse_entity_uri] = (entity_history[0], entity_history[1])
322 self._collect_reverse_relations_recursively(reverse_entity_uri, processed_entities, histories, include_prov_metadata, next_depth)
324 def _get_merged_histories(
325 self,
326 histories: dict,
327 include_prov_metadata: bool
328 ) -> tuple:
329 entity_histories = {}
330 metadata = {}
331 for entity_uri, (entity_history_dict, entity_metadata) in histories.items():
332 entity_histories[entity_uri] = entity_history_dict[entity_uri]
333 if include_prov_metadata and entity_metadata:
334 metadata[entity_uri] = entity_metadata[entity_uri]
336 main_entity_times = sorted(
337 entity_histories[self.res].keys(), key=lambda x: _parse_datetime(x)
338 )
340 merged_histories = {self.res: {}}
342 related_sorted_times = {}
343 for entity_uri, entity_history in entity_histories.items():
344 if entity_uri == self.res:
345 continue
346 related_sorted_times[entity_uri] = sorted(
347 ((t, _parse_datetime(t)) for t in entity_history),
348 key=lambda x: x[1]
349 )
351 for timestamp in main_entity_times:
352 merged_set = set(entity_histories[self.res][timestamp])
353 timestamp_dt = _parse_datetime(timestamp)
355 for entity_uri, sorted_times in related_sorted_times.items():
356 relevant_time = None
357 for etime, etime_dt in sorted_times:
358 if etime_dt <= timestamp_dt:
359 relevant_time = etime
360 else:
361 break
362 if relevant_time:
363 merged_set.update(entity_histories[entity_uri][relevant_time])
365 merged_histories[self.res][timestamp] = merged_set
367 return merged_histories, metadata
369 def get_state_at_time(
370 self,
371 time: tuple[str | None, str | None],
372 include_prov_metadata: bool = False,
373 ) -> tuple:
374 if self.include_related_objects or self.include_merged_entities or self.include_reverse_relations:
375 histories = {}
376 self._collect_all_related_entities_states_at_time(histories, time, include_prov_metadata)
377 return self._get_merged_histories_at_time(histories, include_prov_metadata)
378 else:
379 return self._get_entity_state_at_time(time, include_prov_metadata)
381 def get_delta(
382 self,
383 time_start: str,
384 time_end: str,
385 ) -> tuple[set[tuple[str, ...]], set[tuple[str, ...]]]:
386 is_quadstore = self.config["provenance"]["is_quadstore"]
387 graph_statement = f"GRAPH <{self.res}/prov/>" if is_quadstore else ""
388 query_snapshots = f"""
389 SELECT ?time ?updateQuery
390 WHERE {{
391 {graph_statement}
392 {{
393 ?snapshot <{ProvEntity.iri_specialization_of}> <{self.res}>;
394 <{ProvEntity.iri_generated_at_time}> ?time.
395 OPTIONAL {{
396 ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery.
397 }}
398 }}
399 }}
400 """
401 results = Sparql(query_snapshots, config=self.config).run_select_query()
402 bindings = results['results']['bindings']
403 if not bindings:
404 return set(), set()
405 start_dt = _parse_datetime(time_start)
406 end_dt = _parse_datetime(time_end)
407 parsed = [(b, _parse_datetime(b['time']['value'])) for b in bindings]
408 first_snapshot_dt = min(dt for _, dt in parsed)
409 if first_snapshot_dt > start_dt:
410 entity_graphs, _, _ = self._get_entity_state_at_time(
411 (time_end, time_end), False
412 )
413 if not entity_graphs:
414 return set(), set()
415 state_at_end = next(iter(entity_graphs.values()))
416 return state_at_end, set()
417 relevant = sorted(
418 ((b, dt) for b, dt in parsed
419 if start_dt < dt <= end_dt
420 and 'updateQuery' in b and 'value' in b['updateQuery']),
421 key=lambda x: x[1],
422 )
423 return _compose_update_queries([b['updateQuery']['value'] for b, _ in relevant])
425 def _collect_all_related_entities_states_at_time(
426 self,
427 histories: dict,
428 time: tuple[str | None, str | None],
429 include_prov_metadata: bool
430 ) -> None:
431 main_entity = AgnosticEntity(self.res, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
432 entity_graphs, entity_snapshots, other_snapshots_metadata = main_entity._get_entity_state_at_time(time, include_prov_metadata)
433 histories[self.res] = (entity_graphs, entity_snapshots, other_snapshots_metadata)
435 processed_entities = {self.res}
437 if self.include_related_objects:
438 self._collect_related_objects_states_at_time(self.res, processed_entities, histories, time, include_prov_metadata)
440 if self.include_merged_entities:
441 self._collect_merged_entities_states_at_time(self.res, processed_entities, histories, time, include_prov_metadata)
443 if self.include_reverse_relations:
444 self._collect_reverse_relations_states_at_time(self.res, processed_entities, histories, time, include_prov_metadata)
446 def _collect_related_objects_states_at_time(
447 self,
448 entity_uri: str,
449 processed_entities: set[str],
450 histories: dict,
451 time: tuple[str | None, str | None],
452 include_prov_metadata: bool,
453 depth: int | None = None
454 ) -> None:
455 if depth is not None and depth <= 0:
456 return
458 next_depth = None if depth is None else depth - 1
460 entity_graphs = histories[entity_uri][0] if entity_uri in histories else None
461 if not entity_graphs:
462 return
464 for obj_uri in _find_related_object_uris(entity_uri, entity_graphs):
465 if obj_uri not in processed_entities:
466 processed_entities.add(obj_uri)
467 agnostic_entity = AgnosticEntity(obj_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
468 entity_graphs_new, entity_snapshots, other_snapshots_metadata = agnostic_entity._get_entity_state_at_time(time, include_prov_metadata)
469 histories[obj_uri] = (entity_graphs_new, entity_snapshots, other_snapshots_metadata)
470 self._collect_related_objects_states_at_time(obj_uri, processed_entities, histories, time, include_prov_metadata, next_depth)
472 def _collect_merged_entities_states_at_time(
473 self,
474 entity_uri: str,
475 processed_entities: set[str],
476 histories: dict,
477 time: tuple[str | None, str | None],
478 include_prov_metadata: bool,
479 depth: int | None = None
480 ) -> None:
481 if depth is not None and depth <= 0:
482 return
484 next_depth = None if depth is None else depth - 1
486 merged_entities = self._find_merged_entities(entity_uri)
488 for merged_entity_uri in merged_entities:
489 if merged_entity_uri not in processed_entities:
490 processed_entities.add(merged_entity_uri)
491 agnostic_entity = AgnosticEntity(merged_entity_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
492 entity_graphs, entity_snapshots, other_snapshots_metadata = agnostic_entity._get_entity_state_at_time(time, include_prov_metadata)
493 histories[merged_entity_uri] = (entity_graphs, entity_snapshots, other_snapshots_metadata)
494 self._collect_merged_entities_states_at_time(merged_entity_uri, processed_entities, histories, time, include_prov_metadata, next_depth)
496 def _collect_reverse_relations_states_at_time(
497 self,
498 entity_uri: str,
499 processed_entities: set[str],
500 histories: dict,
501 time: tuple[str | None, str | None],
502 include_prov_metadata: bool,
503 depth: int | None = None
504 ) -> None:
505 if depth is not None and depth <= 0:
506 return
508 next_depth = None if depth is None else depth - 1
510 reverse_related_entities = self._find_reverse_related_entities(entity_uri)
512 for reverse_entity_uri in reverse_related_entities:
513 if reverse_entity_uri not in processed_entities:
514 processed_entities.add(reverse_entity_uri)
515 agnostic_entity = AgnosticEntity(reverse_entity_uri, self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
516 entity_graphs, entity_snapshots, other_snapshots_metadata = agnostic_entity._get_entity_state_at_time(time, include_prov_metadata)
517 histories[reverse_entity_uri] = (entity_graphs, entity_snapshots, other_snapshots_metadata)
518 self._collect_reverse_relations_states_at_time(reverse_entity_uri, processed_entities, histories, time, include_prov_metadata, next_depth)
520 def _get_merged_histories_at_time(
521 self,
522 histories: dict,
523 include_prov_metadata: bool
524 ) -> tuple:
525 entity_histories = {}
526 entity_snapshots_metadata = {}
527 other_snapshots_metadata = {} if include_prov_metadata else None
529 for entity_uri, (entity_graphs, entity_snapshots, other_snapshots) in histories.items():
530 entity_histories[entity_uri] = entity_graphs
531 entity_snapshots_metadata[entity_uri] = entity_snapshots
532 if include_prov_metadata and other_snapshots and other_snapshots_metadata is not None:
533 other_snapshots_metadata[entity_uri] = other_snapshots
535 main_entity_times = sorted(
536 set(entity_histories[self.res].keys()), key=lambda x: _parse_datetime(x)
537 )
539 merged_histories = {self.res: {}}
541 related_sorted_times = {}
542 for entity_uri, graphs_at_times in entity_histories.items():
543 if entity_uri == self.res:
544 continue
545 related_sorted_times[entity_uri] = sorted(
546 ((t, _parse_datetime(t)) for t in graphs_at_times),
547 key=lambda x: x[1]
548 )
550 for timestamp in main_entity_times:
551 merged_set = set(entity_histories[self.res][timestamp])
552 timestamp_dt = _parse_datetime(timestamp)
554 for entity_uri, sorted_times in related_sorted_times.items():
555 graphs_at_times = entity_histories[entity_uri]
556 if timestamp in graphs_at_times:
557 related_quads = graphs_at_times[timestamp]
558 else:
559 relevant_time = None
560 for rt, rt_dt in sorted_times:
561 if rt_dt <= timestamp_dt:
562 relevant_time = rt
563 else:
564 break
565 if relevant_time:
566 related_quads = graphs_at_times[relevant_time]
567 else:
568 continue
569 merged_set.update(related_quads)
571 merged_histories[self.res][timestamp] = merged_set
573 return merged_histories, entity_snapshots_metadata, other_snapshots_metadata
575 def _get_entity_state_at_time(
576 self,
577 time: tuple[str | None, str | None],
578 include_prov_metadata: bool
579 ) -> tuple:
580 other_snapshots_metadata = {}
581 is_quadstore = self.config["provenance"]["is_quadstore"]
582 graph_statement = f"GRAPH <{self.res}/prov/>" if is_quadstore else ""
583 if include_prov_metadata:
584 query_snapshots = f"""
585 SELECT ?snapshot ?time ?responsibleAgent ?updateQuery ?primarySource ?description ?invalidatedAtTime
586 WHERE {{
587 {graph_statement}
588 {{
589 ?snapshot <{ProvEntity.iri_specialization_of}> <{self.res}>;
590 <{ProvEntity.iri_generated_at_time}> ?time;
591 <{ProvEntity.iri_was_attributed_to}> ?responsibleAgent.
592 OPTIONAL {{
593 ?snapshot <{ProvEntity.iri_invalidated_at_time}> ?invalidatedAtTime.
594 }}
595 OPTIONAL {{
596 ?snapshot <{ProvEntity.iri_description}> ?description.
597 }}
598 OPTIONAL {{
599 ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery.
600 }}
601 OPTIONAL {{
602 ?snapshot <{ProvEntity.iri_had_primary_source}> ?primarySource.
603 }}
604 }}
605 }}
606 """
607 else:
608 query_snapshots = f"""
609 SELECT ?snapshot ?time ?updateQuery
610 WHERE {{
611 {graph_statement}
612 {{
613 ?snapshot <{ProvEntity.iri_specialization_of}> <{self.res}>;
614 <{ProvEntity.iri_generated_at_time}> ?time.
615 OPTIONAL {{
616 ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery.
617 }}
618 }}
619 }}
620 """
621 results = Sparql(query_snapshots, config=self.config).run_select_query()
622 bindings = results['results']['bindings']
623 if not bindings:
624 return {}, {}, other_snapshots_metadata
625 sorted_results = sorted(bindings, key=lambda x: _parse_datetime(x['time']['value']), reverse=True)
626 relevant_results = _filter_timestamps_by_interval(time, sorted_results, time_index='time')
627 if include_prov_metadata:
628 relevant_snapshot_uris = {relevant_result['snapshot']['value'] for relevant_result in relevant_results}
629 other_snapshots = [snapshot for snapshot in bindings if snapshot['snapshot']['value'] not in relevant_snapshot_uris]
630 for other_snapshot in other_snapshots:
631 snapshot_uri = other_snapshot['snapshot']['value']
632 other_snapshots_metadata[snapshot_uri] = {
633 "generatedAtTime": other_snapshot['time']['value'],
634 "invalidatedAtTime": other_snapshot.get('invalidatedAtTime', {}).get('value'),
635 "wasAttributedTo": other_snapshot['responsibleAgent']['value'],
636 "hasUpdateQuery": other_snapshot.get('updateQuery', {}).get('value'),
637 "hadPrimarySource": other_snapshot.get('primarySource', {}).get('value'),
638 "description": other_snapshot.get('description', {}).get('value')
639 }
640 if not relevant_results:
641 interval_start = _parse_datetime(time[0]) if time[0] else None
642 if interval_start:
643 earlier_snapshots = [r for r in bindings if _parse_datetime(r['time']['value']) <= interval_start]
644 if earlier_snapshots:
645 latest_snapshot = max(earlier_snapshots, key=lambda x: _parse_datetime(x['time']['value']))
646 relevant_results = [latest_snapshot]
647 else:
648 return {}, {}, other_snapshots_metadata
649 else:
650 return {}, {}, other_snapshots_metadata
651 entity_snapshots = {}
652 entity_graphs: dict[str, set[tuple[str, ...]]] = {}
653 entity_quads = self._query_dataset(self.res)
654 sorted_parsed = [(r, _parse_datetime(r['time']['value'])) for r in sorted_results]
655 last_idx = len(relevant_results) - 1
656 for i, relevant_result in enumerate(relevant_results):
657 relevant_result_time = relevant_result['time']['value']
658 relevant_result_dt = _parse_datetime(relevant_result_time)
659 update_parts = [
660 r['updateQuery']['value']
661 for r, r_dt in sorted_parsed
662 if 'updateQuery' in r and 'value' in r['updateQuery'] and r_dt > relevant_result_dt
663 ]
664 entity_present_graph = entity_quads if i == last_idx else set(entity_quads)
665 if update_parts:
666 self._manage_update_queries(entity_present_graph, ";".join(update_parts))
667 timestamp_key = convert_to_datetime(relevant_result_time, stringify=True)
668 entity_graphs[timestamp_key] = entity_present_graph # type: ignore[index]
669 if include_prov_metadata:
670 snapshot_uri = relevant_result['snapshot']['value']
671 entity_snapshots[snapshot_uri] = {
672 "generatedAtTime": relevant_result_time,
673 "invalidatedAtTime": relevant_result.get('invalidatedAtTime', {}).get('value'),
674 "wasAttributedTo": relevant_result['responsibleAgent']['value'],
675 "hasUpdateQuery": relevant_result.get('updateQuery', {}).get('value'),
676 "hadPrimarySource": relevant_result.get('primarySource', {}).get('value'),
677 "description": relevant_result.get('description', {}).get('value')
678 }
679 return entity_graphs, entity_snapshots, other_snapshots_metadata
681 def _include_prov_metadata(self, triples_generated_at_time: list, current_state: set[tuple[str, ...]]) -> dict:
682 res_n3 = f"<{self.res}>"
683 entity_n3 = f"<{ProvEntity.iri_entity}>"
684 for quad in current_state:
685 if quad[0] == res_n3 and quad[1] == _RDF_TYPE_N3 and quad[2] == entity_n3:
686 return {}
687 prov_properties = {
688 f"<{ProvEntity.iri_invalidated_at_time}>": "invalidatedAtTime",
689 f"<{ProvEntity.iri_was_attributed_to}>": "wasAttributedTo",
690 f"<{ProvEntity.iri_had_primary_source}>": "hadPrimarySource",
691 f"<{ProvEntity.iri_description}>": "description",
692 f"<{ProvEntity.iri_has_update_query}>": "hasUpdateQuery",
693 f"<{ProvEntity.iri_was_derived_from}>": "wasDerivedFrom"
694 }
695 prov_metadata: dict = {
696 self.res: {}
697 }
698 for triple in triples_generated_at_time:
699 time = convert_to_datetime(_n3_value(triple[2]), stringify=True)
700 snapshot_uri_str = _n3_value(triple[0])
701 prov_metadata[self.res][snapshot_uri_str] = {
702 "generatedAtTime": time,
703 "invalidatedAtTime": None,
704 "wasAttributedTo": None,
705 "hadPrimarySource": None,
706 "description": None,
707 "hasUpdateQuery": None,
708 "wasDerivedFrom": []
709 }
710 prov_prop_n3_set = set(prov_properties)
711 index: dict[str, dict[str, list[str]]] = {}
712 for quad in current_state:
713 if quad[1] in prov_prop_n3_set:
714 index.setdefault(quad[0], {}).setdefault(quad[1], []).append(_n3_value(quad[2]))
715 for metadata in dict(prov_metadata).values():
716 for se_uri_str, snapshot_data in metadata.items():
717 se_n3 = f"<{se_uri_str}>"
718 se_props = index.get(se_n3, {})
719 for prov_prop_n3, abbr in prov_properties.items():
720 for value in se_props.get(prov_prop_n3, ()):
721 if abbr == "wasDerivedFrom":
722 snapshot_data[abbr].append(value)
723 else:
724 snapshot_data[abbr] = value
725 if isinstance(snapshot_data.get("wasDerivedFrom"), list):
726 snapshot_data["wasDerivedFrom"] = sorted(snapshot_data["wasDerivedFrom"])
728 return prov_metadata
730 def _get_entity_current_state(self, include_prov_metadata: bool = False) -> list:
731 entity_current_state: list = [{self.res: {}}]
732 prov_quads = self._query_provenance(include_prov_metadata)
733 if len(prov_quads) == 0:
734 entity_current_state.append({})
735 return entity_current_state
736 dataset_quads = self._query_dataset(self.res)
737 gen_at_time_n3 = f"<{ProvEntity.iri_generated_at_time}>"
738 triples_generated_at_time = [
739 quad for quad in prov_quads if quad[1] == gen_at_time_n3
740 ]
741 most_recent_time = None
742 most_recent_time_str: str | None = None
743 for quad in triples_generated_at_time:
744 snapshot_time_str = _n3_value(quad[2])
745 snapshot_date_time = _parse_datetime(snapshot_time_str)
746 if most_recent_time:
747 if snapshot_date_time > most_recent_time:
748 most_recent_time = snapshot_date_time
749 most_recent_time_str = snapshot_time_str
750 else:
751 most_recent_time = snapshot_date_time
752 most_recent_time_str = snapshot_time_str
753 entity_current_state[0][self.res][snapshot_time_str] = None
754 entity_current_state[0][self.res][most_recent_time_str] = dataset_quads
755 if include_prov_metadata:
756 prov_metadata = self._include_prov_metadata(
757 triples_generated_at_time, prov_quads
758 )
759 entity_current_state.append(prov_metadata)
760 else:
761 entity_current_state.append(None)
762 entity_current_state.append(prov_quads)
763 return entity_current_state
765 def _get_old_graphs(self, entity_current_state: list) -> list:
766 prov_quads = entity_current_state.pop(2) if len(entity_current_state) > 2 else set()
767 snapshot_update_queries = _extract_snapshot_update_queries(prov_quads)
768 ordered_data: list[tuple[str, set[tuple[str, ...]]]] = sorted(
769 entity_current_state[0][self.res].items(),
770 key=lambda x: _parse_datetime(str(x[0])),
771 reverse=True
772 )
773 if not ordered_data:
774 return entity_current_state
775 for index, date_graph in enumerate(ordered_data):
776 if index > 0:
777 next_snapshot = ordered_data[index-1][0]
778 previous_graph = set(entity_current_state[0][self.res][next_snapshot])
779 update_query = snapshot_update_queries.get(str(next_snapshot))
780 if update_query is None:
781 entity_current_state[0][self.res][date_graph[0]] = previous_graph
782 else:
783 self._manage_update_queries(previous_graph, update_query)
784 entity_current_state[0][self.res][date_graph[0]] = previous_graph
785 for time in list(entity_current_state[0][self.res]):
786 quad_set = entity_current_state[0][self.res].pop(time)
787 time_str = str(convert_to_datetime(str(time), stringify=True))
788 entity_current_state[0][self.res][time_str] = quad_set
789 return entity_current_state
791 def iter_versions(self):
792 prov_quads = self._query_provenance(include_prov_metadata=False)
793 if len(prov_quads) == 0:
794 return
795 dataset_quads = self._query_dataset(self.res)
796 working: set[tuple[str, ...]] = set(dataset_quads)
797 snapshots = _extract_snapshot_update_queries(prov_quads)
798 ordered = sorted(snapshots.items(), key=lambda x: _parse_datetime(x[0]), reverse=True)
799 for i, (time_str, _update_query) in enumerate(ordered):
800 if i > 0:
801 prev_update = ordered[i - 1][1]
802 if prev_update is not None:
803 self._manage_update_queries(working, prev_update)
804 normalized = str(convert_to_datetime(time_str, stringify=True))
805 yield normalized, set(working)
807 @classmethod
808 def _manage_update_queries(cls, graph: set, update_query: str) -> None:
809 operations = _fast_parse_update(update_query)
810 for operation_type, quads in operations:
811 if operation_type == 'DeleteData':
812 for quad in quads:
813 graph.add(quad)
814 elif operation_type == 'InsertData':
815 for quad in quads:
816 graph.discard(quad)
818 def _query_dataset(self, entity_uri: str | None = None) -> set[tuple[str, ...]]:
819 entity_uri = self.res if entity_uri is None else entity_uri
821 is_quadstore = self.config['dataset']['is_quadstore']
823 if is_quadstore:
824 query_dataset = f"""
825 SELECT ?s ?p ?o ?g
826 WHERE {{
827 GRAPH ?g {{
828 VALUES ?s {{<{entity_uri}>}}
829 ?s ?p ?o
830 }}
831 }}
832 """
833 else:
834 query_dataset = f"""
835 SELECT ?s ?p ?o
836 WHERE {{
837 VALUES ?s {{<{entity_uri}>}}
838 ?s ?p ?o
839 }}
840 """
842 return Sparql(query_dataset, config=self.config).run_select_to_quad_set()
844 def _query_provenance(self, include_prov_metadata:bool=False) -> set[tuple[str, ...]]:
845 if include_prov_metadata:
846 query_provenance = f"""
847 SELECT ?s ?p ?o WHERE {{
848 ?s <{ProvEntity.iri_specialization_of}> <{self.res}>;
849 <{ProvEntity.iri_was_attributed_to}> ?_agent;
850 <{ProvEntity.iri_generated_at_time}> ?_t;
851 <{ProvEntity.iri_description}> ?_desc.
852 ?s ?p ?o.
853 VALUES ?p {{
854 <{ProvEntity.iri_generated_at_time}>
855 <{ProvEntity.iri_was_attributed_to}>
856 <{ProvEntity.iri_had_primary_source}>
857 <{ProvEntity.iri_description}>
858 <{ProvEntity.iri_has_update_query}>
859 <{ProvEntity.iri_invalidated_at_time}>
860 <{ProvEntity.iri_was_derived_from}>
861 <{ProvEntity.iri_specialization_of}>
862 }}
863 }}
864 """
865 else:
866 query_provenance = f"""
867 SELECT ?s ?p ?o WHERE {{
868 ?s <{ProvEntity.iri_specialization_of}> <{self.res}>;
869 <{ProvEntity.iri_generated_at_time}> ?_t.
870 ?s ?p ?o.
871 VALUES ?p {{
872 <{ProvEntity.iri_generated_at_time}>
873 <{ProvEntity.iri_has_update_query}>
874 <{ProvEntity.iri_was_derived_from}>
875 <{ProvEntity.iri_specialization_of}>
876 }}
877 }}
878 """
879 return Sparql(query_provenance, config=self.config).run_select_to_quad_set()
881 def _find_merged_entities(self, entity_uri: str) -> set[str]:
882 merged_entity_uris = set()
883 query_simple = f"""
884 SELECT ?merged_entity_uri
885 WHERE {{
886 ?snapshot <{ProvEntity.iri_specialization_of}> <{entity_uri}> .
887 ?snapshot <{ProvEntity.iri_was_derived_from}> ?derived_snapshot .
888 ?derived_snapshot <{ProvEntity.iri_specialization_of}> ?merged_entity_uri .
889 FILTER (?merged_entity_uri != <{entity_uri}>)
890 }}
891 """
892 try:
893 results = Sparql(query_simple, config=self.config).run_select_query()
894 bindings = results.get('results', {}).get('bindings', [])
895 for binding in bindings:
896 if 'merged_entity_uri' in binding and 'value' in binding['merged_entity_uri']:
897 merged_entity_uris.add(binding['merged_entity_uri']['value'])
898 except Exception as e:
899 print(f"Error querying for merged entities for {entity_uri}: {e}")
901 return merged_entity_uris
903 def _find_reverse_related_entities(self, entity_uri: str) -> set[str]:
904 reverse_related_entity_uris = set()
906 is_quadstore = self.config['dataset']['is_quadstore']
908 if is_quadstore:
909 query = f"""
910 SELECT ?subject
911 WHERE {{
912 GRAPH ?g {{
913 ?subject ?predicate <{entity_uri}> .
914 FILTER(?predicate != <{_RDF_TYPE}> && !strstarts(str(?predicate), "{ProvEntity.PROV}"))
915 }}
916 }}
917 """
918 else:
919 query = f"""
920 SELECT ?subject
921 WHERE {{
922 ?subject ?predicate <{entity_uri}> .
923 FILTER(?predicate != <{_RDF_TYPE}> && !strstarts(str(?predicate), "{ProvEntity.PROV}"))
924 }}
925 """
927 try:
928 results = Sparql(query, config=self.config).run_select_query()
929 bindings = results.get('results', {}).get('bindings', [])
930 for binding in bindings:
931 if 'subject' in binding and 'value' in binding['subject']:
932 subject_uri = binding['subject']['value']
933 if subject_uri != entity_uri:
934 reverse_related_entity_uris.add(subject_uri)
935 except Exception as e:
936 print(f"Error querying for reverse related entities for {entity_uri}: {e}")
938 return reverse_related_entity_uris
940def _filter_timestamps_by_interval(interval: tuple[str | None, str | None] | None, iterator: list, time_index: str | None = None) -> list:
941 if interval:
942 after_time = _parse_datetime(interval[0]) if interval[0] else None
943 before_time = _parse_datetime(interval[1]) if interval[1] else None
944 relevant_timestamps = []
945 for timestamp in iterator:
946 if time_index is not None and time_index in timestamp:
947 time_binding = timestamp[time_index]
948 if 'value' in time_binding:
949 time_str = time_binding['value']
950 time = _parse_datetime(time_str)
951 else:
952 continue
953 else:
954 continue
955 if after_time and before_time:
956 if after_time <= time <= before_time:
957 relevant_timestamps.append(timestamp)
958 elif after_time and not before_time:
959 if time >= after_time:
960 relevant_timestamps.append(timestamp)
961 elif before_time and not after_time:
962 if time <= before_time:
963 relevant_timestamps.append(timestamp)
964 else:
965 relevant_timestamps.append(timestamp)
966 else:
967 relevant_timestamps = iterator.copy()
968 return relevant_timestamps