Coverage for src / time_agnostic_library / agnostic_query.py: 100%
688 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-03-21 11:54 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-03-21 11:54 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
8import atexit
9import json
10import os
11from concurrent.futures import ThreadPoolExecutor, as_completed
13from rdflib.plugins.sparql.parserutils import CompValue
14from rdflib.plugins.sparql.processor import prepareQuery
16from time_agnostic_library.agnostic_entity import (
17 AgnosticEntity,
18 _compose_update_queries,
19 _fast_parse_update,
20 _filter_timestamps_by_interval,
21 _parse_datetime,
22)
23from time_agnostic_library.prov_entity import ProvEntity
24from time_agnostic_library.sparql import Sparql, _binding_to_n3, _n3_to_binding
25from time_agnostic_library.support import convert_to_datetime
27CONFIG_PATH = "./config.json"
29_PARALLEL_THRESHOLD = os.cpu_count() or 1
31_IO_EXECUTOR = ThreadPoolExecutor(max_workers=2)
32atexit.register(_IO_EXECUTOR.shutdown, wait=False)
35def _run_in_parallel(worker_fn, args_list):
36 if len(args_list) < _PARALLEL_THRESHOLD:
37 for args in args_list:
38 yield worker_fn(*args)
39 return
40 with ThreadPoolExecutor() as executor:
41 futures = {executor.submit(worker_fn, *args): i for i, args in enumerate(args_list)}
42 for future in as_completed(futures):
43 yield future.result()
46def _reconstruct_entity_worker(entity, config, on_time, other_snapshots_flag):
47 agnostic_entity = AgnosticEntity(
48 entity, config=config,
49 include_related_objects=False,
50 include_merged_entities=False,
51 include_reverse_relations=False,
52 )
53 if on_time:
54 entity_graphs, _, other_snapshots = agnostic_entity.get_state_at_time(
55 time=on_time, include_prov_metadata=other_snapshots_flag,
56 )
57 return entity, entity_graphs, other_snapshots
58 entity_history = agnostic_entity.get_history(include_prov_metadata=True)
59 return entity, entity_history[0], {}
62def _sparql_values(uris: set[str]) -> str:
63 return " ".join(f"<{uri}>" for uri in uris)
66def _wrap_in_graph(body: str, is_quadstore: bool) -> str:
67 if is_quadstore:
68 return f"GRAPH ?g {{ {body} }}"
69 return body
72def _batch_query_provenance_snapshots(entity_uris: set[str], config: dict) -> dict[str, list[dict]]:
73 values = _sparql_values(entity_uris)
74 body = f"""
75 ?snapshot <{ProvEntity.iri_specialization_of}> ?entity;
76 <{ProvEntity.iri_generated_at_time}> ?time.
77 OPTIONAL {{ ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery. }}
78 VALUES ?entity {{ {values} }}
79 """
80 wrapped = _wrap_in_graph(body, config["provenance"]["is_quadstore"])
81 query = f"SELECT ?entity ?time ?updateQuery WHERE {{ {wrapped} }}"
82 results = Sparql(query, config).run_select_query()
83 output: dict[str, list[dict]] = {uri: [] for uri in entity_uris}
84 for binding in results['results']['bindings']:
85 entity_uri = binding['entity']['value']
86 entry = {
87 'time': binding['time']['value'],
88 'updateQuery': binding['updateQuery']['value'] if 'updateQuery' in binding else None,
89 }
90 output[entity_uri].append(entry)
91 return output
94def _sparql_filter_in(var: str, uris: set[str]) -> str:
95 return f"FILTER({var} IN ({', '.join(f'<{uri}>' for uri in uris)}))"
98def _batch_query_dataset_triples(entity_uris: set[str], config: dict) -> dict[str, set[tuple]]:
99 is_quadstore = config['dataset']['is_quadstore']
100 body = f"?s ?p ?o. {_sparql_filter_in('?s', entity_uris)}"
101 wrapped = _wrap_in_graph(body, is_quadstore)
102 select_vars = "?s ?p ?o ?g" if is_quadstore else "?s ?p ?o"
103 query = f"SELECT {select_vars} WHERE {{ {wrapped} }}"
104 results = Sparql(query, config).run_select_query()
105 output: dict[str, set[tuple]] = {uri: set() for uri in entity_uris}
106 for binding in results['results']['bindings']:
107 s_val = binding['s']['value']
108 s = _binding_to_n3(binding['s'])
109 p = _binding_to_n3(binding['p'])
110 o = _binding_to_n3(binding['o'])
111 if is_quadstore and 'g' in binding:
112 output[s_val].add((s, p, o, _binding_to_n3(binding['g'])))
113 else:
114 output[s_val].add((s, p, o))
115 return output
118def _iter_versions_as_sets(
119 prov_snapshots: list[dict],
120 dataset_quads: set[tuple],
121 relevant_times: set[str] | None = None,
122) -> list[tuple[str, tuple]]:
123 sorted_snaps = sorted(prov_snapshots, key=lambda x: _parse_datetime(x['time']), reverse=True)
124 target_count = len(relevant_times) if relevant_times else None
125 working = set(dataset_quads)
126 results = []
127 found = 0
128 for i, snap in enumerate(sorted_snaps):
129 if i > 0:
130 prev_uq = sorted_snaps[i - 1]['updateQuery']
131 if prev_uq is not None:
132 for op_type, quads in _fast_parse_update(prev_uq):
133 if op_type == 'DeleteData':
134 for quad in quads:
135 working.add(quad)
136 elif op_type == 'InsertData':
137 for quad in quads:
138 working.discard(quad)
139 if relevant_times is None or snap['time'] in relevant_times:
140 normalized = str(convert_to_datetime(snap['time'], stringify=True))
141 results.append((normalized, tuple(working)))
142 found += 1
143 if target_count is not None and found == target_count:
144 break
145 return results
148def _reconstruct_at_time_as_sets(
149 prov_snapshots: list[dict],
150 dataset_quads: set[tuple],
151 on_time: tuple[str | None, str | None],
152) -> list[tuple[str, tuple]]:
153 if not prov_snapshots:
154 return []
155 sorted_snaps = sorted(prov_snapshots, key=lambda x: _parse_datetime(x['time']), reverse=True)
156 relevant = _filter_timestamps_by_interval(
157 on_time,
158 [{'time': {'value': s['time']}} for s in sorted_snaps],
159 time_index='time',
160 )
161 if not relevant:
162 interval_start = _parse_datetime(on_time[0]) if on_time[0] else None
163 if interval_start:
164 earlier = [s for s in sorted_snaps if _parse_datetime(s['time']) <= interval_start]
165 if earlier:
166 best = max(earlier, key=lambda x: _parse_datetime(x['time']))
167 relevant = [{'time': {'value': best['time']}}]
168 else:
169 return []
170 else:
171 return []
172 relevant_times = {r['time']['value'] for r in relevant}
173 return _iter_versions_as_sets(prov_snapshots, dataset_quads, relevant_times)
176def _match_single_pattern(triple_pattern: tuple, quads: tuple) -> list[dict]:
177 s_pat, p_pat, o_pat = triple_pattern[0], triple_pattern[1], triple_pattern[2]
178 s_is_var = s_pat.startswith('?')
179 p_is_var = p_pat.startswith('?')
180 o_is_var = o_pat.startswith('?')
181 bindings = []
182 for quad in quads:
183 s, p, o = quad[0], quad[1], quad[2]
184 if not p_is_var and p != p_pat:
185 continue
186 if not o_is_var and o != o_pat:
187 continue
188 if not s_is_var and s != s_pat:
189 continue
190 binding = {}
191 if s_is_var:
192 binding[s_pat[1:]] = _n3_to_binding(s)
193 if p_is_var:
194 binding[p_pat[1:]] = _n3_to_binding(p)
195 if o_is_var:
196 binding[o_pat[1:]] = _n3_to_binding(o)
197 bindings.append(binding)
198 return bindings
201def _merge_entity_bindings(entity_bindings: dict[str, dict[str, list[dict]]]) -> dict[str, list[dict]]:
202 all_timestamps: set[str] = set()
203 for per_ts in entity_bindings.values():
204 all_timestamps.update(per_ts.keys())
205 sorted_timestamps = sorted(all_timestamps, key=_parse_datetime)
206 result: dict[str, list[dict]] = {}
207 last_known: dict[str, list[dict]] = {}
208 for ts in sorted_timestamps:
209 merged: list[dict] = []
210 for entity_str, per_ts in entity_bindings.items():
211 if ts in per_ts:
212 last_known[entity_str] = per_ts[ts]
213 if entity_str in last_known:
214 merged.extend(last_known[entity_str])
215 result[ts] = merged
216 return result
219def _batch_query_dm_provenance(entity_uris: set[str], config: dict) -> dict[str, list[dict]]:
220 values = _sparql_values(entity_uris)
221 query = f"""
222 SELECT ?entity ?time ?updateQuery ?invalidatedAtTime
223 WHERE {{
224 ?se <{ProvEntity.iri_specialization_of}> ?entity;
225 <{ProvEntity.iri_generated_at_time}> ?time.
226 OPTIONAL {{
227 ?se <{ProvEntity.iri_has_update_query}> ?updateQuery.
228 }}
229 OPTIONAL {{
230 ?se <{ProvEntity.iri_invalidated_at_time}> ?invalidatedAtTime.
231 }}
232 VALUES ?entity {{ {values} }}
233 }}
234 """
235 results = Sparql(query, config).run_select_query()
236 output: dict[str, list[dict]] = {uri: [] for uri in entity_uris}
237 for binding in results['results']['bindings']:
238 entity_uri = binding['entity']['value']
239 entry = {
240 'time': binding['time']['value'],
241 'updateQuery': binding['updateQuery']['value'] if 'updateQuery' in binding else None,
242 'invalidatedAtTime': binding['invalidatedAtTime']['value'] if 'invalidatedAtTime' in binding else None,
243 }
244 output[entity_uri].append(entry)
245 return output
248def _build_delta_result(
249 entity_str: str,
250 snapshots: list[dict],
251 on_time: tuple[str | None, str | None] | None,
252 changed_properties: set[str],
253) -> dict:
254 output: dict[str, dict] = {}
255 parsed_snaps = [(snap, _parse_datetime(snap['time'])) for snap in snapshots]
256 parsed_snaps.sort(key=lambda x: x[1])
257 after_dt = _parse_datetime(on_time[0]) if on_time and on_time[0] else None
258 before_dt = _parse_datetime(on_time[1]) if on_time and on_time[1] else None
259 creation_dt = parsed_snaps[0][1]
260 update_queries: list[str] = []
261 created = None
262 has_relevant = False
263 for snap, snap_dt in parsed_snaps:
264 if after_dt and snap_dt < after_dt:
265 continue
266 if before_dt and snap_dt > before_dt:
267 break
268 has_relevant = True
269 if snap_dt == creation_dt:
270 created = creation_dt.isoformat()
271 elif snap['updateQuery']:
272 update_queries.append(snap['updateQuery'])
273 if not has_relevant:
274 return output
275 additions, deletions = _compose_update_queries(update_queries)
276 if changed_properties:
277 prop_n3_set = {f"<{p}>" for p in changed_properties}
278 additions = {q for q in additions if q[1] in prop_n3_set}
279 deletions = {q for q in deletions if q[1] in prop_n3_set}
280 last_snap = parsed_snaps[-1][0]
281 deleted = parsed_snaps[-1][1].isoformat() if last_snap['invalidatedAtTime'] else None
282 output[entity_str] = {
283 "created": created,
284 "deleted": deleted,
285 "additions": additions,
286 "deletions": deletions,
287 }
288 return output
291class AgnosticQuery:
292 blazegraph_full_text_search: bool
293 fuseki_full_text_search: bool
294 virtuoso_full_text_search: bool
295 graphdb_connector_name: str
297 def __init__(self, query: str, on_time: tuple[str | None, str | None] | None = (None, None), other_snapshots: bool = False, config_path: str = CONFIG_PATH, config_dict: dict | None = None):
298 self.query = query
299 self.other_snapshots = other_snapshots
300 self.config_path = config_path
301 self.other_snapshots_metadata: dict = {}
302 if config_dict is not None:
303 self.config = config_dict
304 else:
305 with open(config_path, encoding="utf8") as json_file:
306 self.config = json.load(json_file)
307 self.__init_text_index(self.config)
308 if on_time:
309 after_time = convert_to_datetime(on_time[0], stringify=True)
310 before_time = convert_to_datetime(on_time[1], stringify=True)
311 self.on_time: tuple[str | None, str | None] | None = (after_time, before_time) # type: ignore[assignment]
312 else:
313 self.on_time = None
314 self.reconstructed_entities: set[str] = set()
315 self.vars_to_explicit_by_time: dict = {}
316 self.relevant_entities_graphs: dict[str, dict[str, set]] = {}
317 self.relevant_graphs: dict[str, set[tuple[str, ...]]] = {}
318 self._rebuild_relevant_graphs()
320 def __init_text_index(self, config:dict):
321 for full_text_search in ("blazegraph_full_text_search", "fuseki_full_text_search", "virtuoso_full_text_search"):
322 ts_full_text_search:str = config[full_text_search]
323 if ts_full_text_search.lower() in {"true", "1", 1, "t", "y", "yes", "ok"}:
324 setattr(self, full_text_search, True)
325 elif ts_full_text_search.lower() in {"false", "0", 0, "n", "f", "no"} or not ts_full_text_search:
326 setattr(self, full_text_search, False)
327 else:
328 raise ValueError(f"Enter a valid value for '{full_text_search}' in the configuration file, for example 'yes' or 'no'.")
329 self.graphdb_connector_name = config["graphdb_connector_name"]
330 if len([index for index in [self.blazegraph_full_text_search, self.fuseki_full_text_search, self.virtuoso_full_text_search, self.graphdb_connector_name] if index]) > 1:
331 raise ValueError("The use of multiple indexing systems simultaneously is currently not supported.")
333 def _process_query(self) -> list[tuple[str, ...]]:
334 # Parse the SPARQL string into an algebra tree via rdflib, then walk
335 # the tree to extract triple patterns as N3 strings, separating
336 # mandatory patterns from OPTIONAL groups.
337 algebra = prepareQuery(self.query).algebra
338 if algebra.name != "SelectQuery":
339 raise ValueError("Only SELECT queries are allowed.")
340 mandatory: list[tuple[str, ...]] = []
341 self._optional_groups: list[list[tuple[str, ...]]] = []
342 self._collect_patterns(algebra, mandatory)
343 all_triples = list(mandatory)
344 for group in self._optional_groups:
345 all_triples.extend(group)
346 # Reject triples made of only variables (e.g. ?s ?p ?o): they would
347 # match every entity in the dataset, making the query too expensive.
348 triples_without_hook = [t for t in all_triples if all(el.startswith('?') for el in t)]
349 if triples_without_hook:
350 raise ValueError("Could not perform a generic time agnostic query. Please, specify at least one URI or Literal within the query.")
351 self._select_vars = [str(v) for v in algebra['PV']]
352 self._mandatory_triples = mandatory
353 return all_triples
355 def _collect_patterns(self, node: CompValue, mandatory: list[tuple[str, ...]]) -> None:
356 name = node.name
357 if name == 'LeftJoin':
358 # OPTIONAL = left join: p1 (left, mandatory) must match, p2 (right,
359 # optional) extends the binding if possible, otherwise it's ignored
360 self._collect_patterns(node['p1'], mandatory)
361 opt_group: list[tuple[str, ...]] = []
362 self._collect_triples_flat(node['p2'], opt_group)
363 if opt_group:
364 self._optional_groups.append(opt_group)
365 elif name == 'Join':
366 # Both sides are mandatory (rdflib splits BGPs into Join nodes)
367 self._collect_patterns(node['p1'], mandatory)
368 self._collect_patterns(node['p2'], mandatory)
369 elif 'triples' in node:
370 # BGP leaf node: convert rdflib terms to N3 strings
371 mandatory.extend(tuple(el.n3() for el in t) for t in node['triples'])
372 else:
373 for v in node.values():
374 if isinstance(v, CompValue):
375 self._collect_patterns(v, mandatory)
377 def _collect_triples_flat(self, node: CompValue, triples: list[tuple[str, ...]]) -> None:
378 if 'triples' in node:
379 triples.extend(tuple(el.n3() for el in t) for t in node['triples'])
380 for v in node.values():
381 if isinstance(v, CompValue):
382 self._collect_triples_flat(v, triples)
384 def _rebuild_relevant_graphs(self) -> None:
385 triples_checked = set()
386 all_isolated = True
387 self.triples = self._process_query()
388 for triple in self.triples:
389 if self._is_isolated(triple) and self._is_a_new_triple(triple, triples_checked):
390 present_entities = self._get_present_entities(triple)
391 self._rebuild_relevant_entity(triple[0])
392 self._find_entities_in_update_queries(triple, present_entities)
393 else:
394 all_isolated = False
395 self._rebuild_relevant_entity(triple[0])
396 triples_checked.add(triple)
397 self._align_snapshots()
398 if not all_isolated:
399 self._solve_variables()
401 def _is_isolated(self, triple: tuple) -> bool:
402 if triple[0].startswith('<') and triple[0].endswith('>'):
403 return False
404 variables = [el for el in triple if el.startswith('?')]
405 for variable in variables:
406 other_triples = {t for t in self.triples if t != triple}
407 if self._there_is_transitive_closure(variable, other_triples):
408 return False
409 return True
411 def _there_is_transitive_closure(self, variable: str, triples: set[tuple]) -> bool:
412 there_is_transitive_closure = False
413 for triple in triples:
414 if variable in triple and triple.index(variable) == 2:
415 if triple[0].startswith('<') and triple[0].endswith('>'):
416 return True
417 elif triple[0].startswith('?'):
418 other_triples = {t for t in triples if t != triple}
419 there_is_transitive_closure = self._there_is_transitive_closure(triple[0], other_triples)
420 return there_is_transitive_closure
422 def _rebuild_relevant_entity(self, entity_n3: str) -> None:
423 if entity_n3.startswith('<') and entity_n3.endswith('>'):
424 entity_uri = entity_n3[1:-1]
425 if entity_uri not in self.reconstructed_entities:
426 self.reconstructed_entities.add(entity_uri)
427 result = self._reconstruct_entity_state(entity_uri)
428 if result is not None:
429 self._merge_entity_result(entity_uri, *result)
431 def _reconstruct_entity_state(self, entity_uri: str) -> tuple[dict, dict] | None:
432 agnostic_entity = AgnosticEntity(entity_uri, config=self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
433 if self.on_time:
434 entity_graphs, _, other_snapshots = agnostic_entity.get_state_at_time(time=self.on_time, include_prov_metadata=self.other_snapshots)
435 return entity_graphs, other_snapshots
436 entity_history = agnostic_entity.get_history(include_prov_metadata=True)
437 return entity_history[0], {}
439 def _merge_entity_result(self, entity_uri: str, entity_graphs: dict, other_snapshots: dict) -> None:
440 if other_snapshots:
441 self.other_snapshots_metadata.update(other_snapshots)
442 if self.on_time:
443 if entity_graphs:
444 for relevant_timestamp, quad_set in entity_graphs.items():
445 self.relevant_entities_graphs.setdefault(entity_uri, {})[relevant_timestamp] = quad_set
446 else:
447 if entity_graphs.get(entity_uri):
448 self.relevant_entities_graphs.update(entity_graphs)
450 def _get_present_entities(self, triple: tuple) -> set[str]:
451 variables = [el for el in triple if el.startswith('?')]
452 if self.config['dataset']['is_quadstore']:
453 query = f"SELECT {' '.join(variables)} WHERE {{GRAPH ?_g {{{triple[0]} {triple[1]} {triple[2]}}} FILTER(!CONTAINS(STR(?_g), '/prov/'))}}"
454 else:
455 query = f"SELECT {' '.join(variables)} WHERE {{{triple[0]} {triple[1]} {triple[2]}}}"
456 results = Sparql(query, self.config).run_select_query()
457 bindings = results['results']['bindings']
458 if triple[1].startswith('^'):
459 if triple[2].startswith('?'):
460 var_name = triple[2][1:]
461 return {b[var_name]['value'] for b in bindings if var_name in b and b[var_name]['type'] == 'uri'}
462 return {triple[2][1:-1]} if bindings else set()
463 var_name = triple[0][1:]
464 return {b[var_name]['value'] for b in bindings if var_name in b and b[var_name]['type'] == 'uri'}
466 def _is_a_new_triple(self, triple: tuple, triples_checked: set) -> bool:
467 uris_in_triple = {el for el in triple if el.startswith('<') and el.endswith('>')}
468 for triple_checked in triples_checked:
469 uris_in_triple_checked = {el for el in triple_checked if el.startswith('<') and el.endswith('>')}
470 if not uris_in_triple.difference(uris_in_triple_checked):
471 return False
472 return True
474 def _get_query_to_update_queries(self, triple: tuple) -> str:
475 uris_n3 = {el for el in triple if el.startswith('<') and el.endswith('>')}
476 return self.get_full_text_search(uris_n3)
478 def get_full_text_search(self, uris_in_triple: set) -> str:
479 uris_in_triple = {el[1:-1] if el.startswith('<') and el.endswith('>') else el for el in uris_in_triple}
480 if self.blazegraph_full_text_search:
481 query_to_identify = f'''
482 PREFIX bds: <http://www.bigdata.com/rdf/search#>
483 SELECT ?updateQuery
484 WHERE {{
485 ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery.
486 ?updateQuery bds:search "{' '.join(uris_in_triple)}";
487 bds:matchAllTerms 'true'.
488 }}
489 '''
490 elif self.fuseki_full_text_search:
491 query_obj = '\\" AND \\"'.join(uris_in_triple)
492 query_to_identify = f'''
493 PREFIX text: <http://jena.apache.org/text#>
494 SELECT ?updateQuery WHERE {{
495 ?se text:query "\\"{query_obj}\\"";
496 <{ProvEntity.iri_has_update_query}> ?updateQuery.
497 }}
498 '''
499 elif self.virtuoso_full_text_search:
500 query_obj = "' AND '".join(uris_in_triple)
501 query_to_identify = f'''
502 PREFIX bif: <bif:>
503 SELECT ?updateQuery
504 WHERE {{
505 ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery.
506 ?updateQuery bif:contains "'{query_obj}'".
507 }}
508 '''
509 elif self.graphdb_connector_name:
510 all = '\"'
511 con_queries = f"con:query '{all}" + f"{all}'; con:query '{all}".join(uris_in_triple) + f"{all}'"
512 query_to_identify = f'''
513 PREFIX con: <http://www.ontotext.com/connectors/lucene#>
514 PREFIX con-inst: <http://www.ontotext.com/connectors/lucene/instance#>
515 SELECT ?updateQuery
516 WHERE {{
517 ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery.
518 [] a con-inst:{self.graphdb_connector_name};
519 {con_queries};
520 con:entities ?snapshot.
521 }}
522 '''
523 else:
524 query_to_identify = f'''
525 SELECT ?updateQuery
526 WHERE {{
527 ?snapshot <{ProvEntity.iri_has_update_query}> ?updateQuery.
528 {').'.join([f"FILTER CONTAINS (?updateQuery, '{uri}'" for uri in uris_in_triple])}).
529 }}
530 '''
531 return query_to_identify
533 def _find_entity_uris_in_update_queries(self, triple: tuple, entities: set) -> None:
534 uris_n3 = {el for el in triple if el.startswith('<') and el.endswith('>')}
535 uris_str = {el[1:-1] for el in uris_n3}
536 if not any([self.blazegraph_full_text_search, self.fuseki_full_text_search,
537 self.virtuoso_full_text_search, self.graphdb_connector_name]):
538 filter_clauses = ".".join(
539 f"FILTER CONTAINS (?uq, '{uri}')" for uri in uris_str
540 )
541 query = f"""
542 SELECT ?entity WHERE {{
543 ?snapshot <{ProvEntity.iri_specialization_of}> ?entity;
544 <{ProvEntity.iri_has_update_query}> ?uq.
545 {filter_clauses}.
546 }}
547 """
548 results = Sparql(query, self.config).run_select_query()
549 for binding in results['results']['bindings']:
550 entities.add(binding['entity']['value'])
551 return
552 query_to_identify = self._get_query_to_update_queries(triple)
553 results = Sparql(query_to_identify, self.config).run_select_query()
554 for binding in results['results']['bindings']:
555 uq = binding.get('updateQuery')
556 if uq and uq.get('value'):
557 for _, quads in _fast_parse_update(uq['value']):
558 for quad in quads:
559 quad_uris = {el for el in quad[:3] if el.startswith('<') and el.endswith('>')}
560 if uris_n3.issubset(quad_uris):
561 entities.add(quad[0][1:-1])
563 def _find_entities_in_update_queries(self, triple: tuple, present_entities: set | None = None):
564 if present_entities is None:
565 present_entities = set()
566 relevant_entities_found = present_entities
567 self._find_entity_uris_in_update_queries(triple, relevant_entities_found)
568 if relevant_entities_found:
569 args_list = [
570 (entity_uri, self.config, self.on_time, self.other_snapshots)
571 for entity_uri in relevant_entities_found
572 ]
573 for result in _run_in_parallel(_reconstruct_entity_worker, args_list):
574 if result is not None:
575 entity, entity_graphs, other_snapshots = result
576 self.reconstructed_entities.add(entity)
577 self._merge_entity_result(entity, entity_graphs, other_snapshots)
579 def _solve_variables(self) -> None:
580 self.vars_to_explicit_by_time = {}
581 self._get_vars_to_explicit_by_time()
582 while self._there_are_variables():
583 solved_variables = self._explicit_solvable_variables()
584 self._align_snapshots()
585 if not solved_variables:
586 return
587 self._update_vars_to_explicit(solved_variables)
588 self._get_vars_to_explicit_by_time()
590 def _there_are_variables(self) -> bool:
591 for triples in self.vars_to_explicit_by_time.values():
592 for triple in triples:
593 if any(el.startswith('?') for el in triple):
594 return True
595 return False
597 def _explicit_solvable_variables(self) -> dict:
598 explicit_triples: dict[str, dict[str, set]] = {}
599 for se, triples in self.vars_to_explicit_by_time.items():
600 for triple in triples:
601 variables = [el for el in triple if el.startswith('?')]
602 if len(variables) == 1:
603 variable = variables[0]
604 variable_index = triple.index(variable)
605 if variable_index == 2:
606 matching = [q for q in self.relevant_graphs[se]
607 if q[0] == triple[0] and q[1] == triple[1]]
608 query_results = [(triple[0], triple[1], q[2]) for q in matching]
609 for row in query_results:
610 explicit_triples.setdefault(se, {})
611 explicit_triples[se].setdefault(variable, set())
612 explicit_triples[se][variable].add(row)
613 args_list = [
614 (row[2][1:-1], self.config, self.on_time, self.other_snapshots)
615 for row in query_results
616 if row[2].startswith('<') and row[2].endswith('>') and row[2][1:-1] not in self.reconstructed_entities
617 ]
618 for result_data in _run_in_parallel(_reconstruct_entity_worker, args_list):
619 if result_data is not None:
620 entity, entity_graphs, other_snapshots = result_data
621 self.reconstructed_entities.add(entity)
622 self._merge_entity_result(entity, entity_graphs, other_snapshots)
623 return explicit_triples
625 def _align_snapshots(self) -> None:
626 for snapshots in self.relevant_entities_graphs.values():
627 for snapshot, quad_set in snapshots.items():
628 if snapshot in self.relevant_graphs:
629 self.relevant_graphs[snapshot].update(quad_set)
630 else:
631 self.relevant_graphs[snapshot] = set(quad_set)
632 if len(self.relevant_graphs) <= 1:
633 return
634 ordered_data = sorted(
635 self.relevant_graphs.items(),
636 key=lambda x: _parse_datetime(x[0]),
637 )
638 for index, (se, quad_set) in enumerate(ordered_data):
639 if index > 0:
640 previous_se = ordered_data[index - 1][0]
641 prev_subjects = {q[0] for q in self.relevant_graphs[previous_se]}
642 cur_subjects = {q[0] for q in quad_set}
643 for subject_n3 in prev_subjects:
644 subject_uri = subject_n3[1:-1] if subject_n3.startswith('<') else subject_n3
645 if subject_n3 not in cur_subjects and subject_uri in self.relevant_entities_graphs and se not in self.relevant_entities_graphs[subject_uri]:
646 for quad in self.relevant_graphs[previous_se]:
647 if quad[0] == subject_n3:
648 self.relevant_graphs[se].add(quad)
650 def _update_vars_to_explicit(self, solved_variables: dict):
651 vars_to_explicit_by_time: dict = {}
652 for se, triples in self.vars_to_explicit_by_time.items():
653 vars_to_explicit_by_time.setdefault(se, set())
654 new_triples = set()
655 for triple in triples:
656 if se in solved_variables:
657 for solved_var, solved_triples in solved_variables[se].items():
658 if solved_var in triple:
659 for solved_triple in solved_triples:
660 new_triple = None
661 if solved_triple[0] != triple[0] and solved_triple[1] == triple[1]:
662 continue
663 elif solved_triple[0] == triple[0] and solved_triple[1] == triple[1]:
664 new_triple = solved_triple
665 else:
666 new_triple = (solved_triple[2], triple[1], triple[2])
667 new_triples.add(new_triple)
668 elif not any(el.startswith('?') for el in triple) or not any(var for var in solved_variables[se] if var in triple):
669 new_triples.add(triple)
670 vars_to_explicit_by_time[se] = new_triples
671 self.vars_to_explicit_by_time = vars_to_explicit_by_time
673 def _get_vars_to_explicit_by_time(self) -> None:
674 relevant_triples = None
675 for se in self.relevant_graphs:
676 if se not in self.vars_to_explicit_by_time:
677 if relevant_triples is None:
678 relevant_triples = set()
679 for triple in self.triples:
680 if any(el for el in triple if el.startswith('?') and not self._is_a_dead_end(el, triple)) and not self._is_isolated(triple):
681 relevant_triples.add(triple)
682 self.vars_to_explicit_by_time[se] = set(relevant_triples)
684 def _is_a_dead_end(self, el: str, triple: tuple) -> bool:
685 return el.startswith('?') and triple.index(el) == 2 and not any(t for t in self.triples if el in t if t.index(el) == 0)
688class VersionQuery(AgnosticQuery):
689 """
690 This class allows time-travel queries, both on a single version and all versions of the dataset.
692 :param query: The SPARQL query string.
693 :type query: str
694 :param on_time: If you want to query a specific version, specify the time interval here. The format is (START, END). If one of the two values is None, only the other is considered. Dates must be in ISO 8601 format.
695 :type on_time: Tuple[Union[str, None]], optional
696 :param config_path: The path to the configuration file.
697 :type config_path: str, optional
698 """
699 def __init__(self, query:str, on_time: tuple[str | None, str | None] | None = None, other_snapshots:bool=False, config_path:str=CONFIG_PATH, config_dict: dict | None = None):
700 self._streaming_results: dict[str, list[dict]] = {}
701 super().__init__(query, on_time, other_snapshots, config_path, config_dict)
703 def _rebuild_relevant_graphs(self) -> None:
704 self.triples = self._process_query()
705 if self.on_time is not None:
706 if (len(self.triples) == 1
707 and self._is_isolated(self.triples[0])
708 and not self.triples[0][1].startswith('^')
709 and not self.other_snapshots):
710 self._rebuild_vm_batch()
711 return
712 super()._rebuild_relevant_graphs()
713 return
714 if not all(self._is_isolated(t) for t in self.triples):
715 super()._rebuild_relevant_graphs()
716 self._streaming_results = {
717 str(convert_to_datetime(ts, stringify=True)): self._extract_bindings(g)
718 for ts, g in self.relevant_graphs.items()
719 }
720 return
721 self._rebuild_streaming()
723 def _discover_entities_parallel(self, triple: tuple) -> set[str]:
724 fut_present = _IO_EXECUTOR.submit(self._get_present_entities, triple)
725 entities_set: set = set()
726 fut_prov = _IO_EXECUTOR.submit(self._find_entity_uris_in_update_queries, triple, entities_set)
727 present_entities = fut_present.result()
728 fut_prov.result()
729 all_entities = set(present_entities)
730 all_entities.update(entities_set)
731 return all_entities
733 def _rebuild_vm_batch(self) -> None:
734 assert self.on_time is not None
735 triple = self.triples[0]
736 all_entity_strs = self._discover_entities_parallel(triple)
737 if not all_entity_strs:
738 return
739 fut_prov = _IO_EXECUTOR.submit(_batch_query_provenance_snapshots, all_entity_strs, self.config)
740 fut_data = _IO_EXECUTOR.submit(_batch_query_dataset_triples, all_entity_strs, self.config)
741 prov_data = fut_prov.result()
742 dataset_data = fut_data.result()
743 entity_bindings: dict[str, dict[str, list[dict]]] = {}
744 for entity_str in all_entity_strs:
745 per_ts: dict[str, list[dict]] = {}
746 for ts, quad_set in _reconstruct_at_time_as_sets(
747 prov_data[entity_str], dataset_data[entity_str], self.on_time,
748 ):
749 per_ts[ts] = _match_single_pattern(triple, quad_set)
750 entity_bindings[entity_str] = per_ts
751 self._streaming_results = _merge_entity_bindings(entity_bindings)
753 def _extract_bindings(self, quads: set[tuple[str, ...]]) -> list[dict]:
754 # Match the SPARQL query patterns against the quad set.
755 # Phase 1: mandatory triples. Start with an empty binding and for each
756 # pattern keep only the quads that are compatible with what was already
757 # matched.
758 bindings: list[dict[str, str]] = [{}]
759 for pattern in self._mandatory_triples:
760 new_bindings: list[dict[str, str]] = []
761 for binding in bindings:
762 for quad in quads:
763 new_binding = self._try_match(pattern, quad, binding)
764 if new_binding is not None:
765 new_bindings.append(new_binding)
766 bindings = new_bindings
767 # Phase 2: OPTIONAL groups. Try to extend each binding, but if nothing
768 # matches, keep the binding as-is (no data is lost).
769 for opt_group in self._optional_groups:
770 bindings = self._left_join(bindings, opt_group, quads)
771 # Phase 3: project to SELECT variables and deduplicate.
772 seen: set[frozenset] = set()
773 result: list[dict] = []
774 for b in bindings:
775 projected_n3: dict[str, str] = {}
776 for var in self._select_vars:
777 key = '?' + var
778 if key in b:
779 projected_n3[var] = b[key]
780 frozen = frozenset(projected_n3.items())
781 if frozen not in seen:
782 seen.add(frozen)
783 result.append({var: _n3_to_binding(val) for var, val in projected_n3.items()})
784 return result
786 def _left_join(self, bindings: list[dict], opt_triples: list[tuple], quads: set[tuple[str, ...]]) -> list[dict]:
787 # For each binding, try to add values from the optional patterns.
788 # If a quad matches, the binding grows with new variables.
789 # If nothing matches, the binding is kept unchanged.
790 result: list[dict] = []
791 for binding in bindings:
792 extended: list[dict] = [dict(binding)]
793 for pattern in opt_triples:
794 new_extended: list[dict] = []
795 for b in extended:
796 matched = False
797 for quad in quads:
798 new_b = self._try_match(pattern, quad, b)
799 if new_b is not None:
800 new_extended.append(new_b)
801 matched = True
802 if not matched:
803 new_extended.append(b)
804 extended = new_extended
805 result.extend(extended)
806 return result
808 @staticmethod
809 def _try_match(pattern: tuple, quad: tuple, binding: dict) -> dict | None:
810 # Check if a triple pattern (s, p, o) matches a quad.
811 new_binding = dict(binding)
812 for expected, actual in zip(pattern[:3], quad[:3], strict=True):
813 is_variable = expected.startswith('?')
814 if is_variable and expected in new_binding:
815 # Variable already bound: check consistency
816 if new_binding[expected] != actual:
817 return None
818 elif is_variable:
819 # New variable: bind it
820 new_binding[expected] = actual
821 elif expected != actual:
822 # Fixed term: must match exactly
823 return None
824 return new_binding
826 def _rebuild_streaming(self) -> None:
827 triples_checked = set()
828 all_entity_strs: set[str] = set()
829 use_fast_path = (
830 len(self.triples) == 1
831 and self._is_isolated(self.triples[0])
832 and not self.triples[0][1].startswith('^')
833 )
834 for triple in self.triples:
835 if self._is_a_new_triple(triple, triples_checked):
836 present_entities = self._get_present_entities(triple)
837 prov_entities: set = set()
838 self._find_entity_uris_in_update_queries(triple, prov_entities)
839 all_entity_strs.update(present_entities)
840 all_entity_strs.update(prov_entities)
841 triples_checked.add(triple)
842 if not all_entity_strs:
843 self._streaming_results = {}
844 return
845 if use_fast_path:
846 fut_prov = _IO_EXECUTOR.submit(_batch_query_provenance_snapshots, all_entity_strs, self.config)
847 fut_data = _IO_EXECUTOR.submit(_batch_query_dataset_triples, all_entity_strs, self.config)
848 prov_data = fut_prov.result()
849 dataset_data = fut_data.result()
850 triple = self.triples[0]
851 entity_bindings: dict[str, dict[str, list[dict]]] = {}
852 for entity_str in all_entity_strs:
853 per_ts: dict[str, list[dict]] = {}
854 for ts, quad_set in _iter_versions_as_sets(prov_data[entity_str], dataset_data[entity_str]):
855 per_ts[ts] = _match_single_pattern(triple, quad_set)
856 entity_bindings[entity_str] = per_ts
857 else:
858 entity_bindings = {}
859 for entity_str in all_entity_strs:
860 ae = AgnosticEntity(entity_str, config=self.config, include_related_objects=False, include_merged_entities=False, include_reverse_relations=False)
861 per_ts = {}
862 for ts, quad_set in ae.iter_versions():
863 per_ts[ts] = self._extract_bindings(quad_set)
864 entity_bindings[entity_str] = per_ts
865 self._streaming_results = _merge_entity_bindings(entity_bindings)
867 def run_agnostic_query(self, include_all_timestamps: bool = False) -> tuple[dict[str, list[dict]], set]:
868 if self.on_time is None or self._streaming_results:
869 agnostic_result = self._streaming_results
870 if include_all_timestamps:
871 agnostic_result = self._fill_timestamp_gaps(agnostic_result)
872 return agnostic_result, set()
873 agnostic_result: dict[str, list[dict]] = {}
874 for timestamp, graph in self.relevant_graphs.items():
875 normalized = str(convert_to_datetime(timestamp, stringify=True))
876 agnostic_result[normalized] = self._extract_bindings(graph)
877 return agnostic_result, {data["generatedAtTime"] for _, data in self.other_snapshots_metadata.items()}
879 def _get_all_provenance_timestamps(self) -> set:
880 query = f"""
881 SELECT ?time WHERE {{
882 ?snapshot <{ProvEntity.iri_generated_at_time}> ?time .
883 }}
884 """
885 results = Sparql(query, self.config).run_select_query()
886 return {r['time']['value'] for r in results['results']['bindings']}
888 def _fill_timestamp_gaps(self, result: dict) -> dict:
889 all_timestamps = self._get_all_provenance_timestamps()
890 sorted_result_ts = sorted(result.keys(), key=_parse_datetime)
891 if not sorted_result_ts:
892 return result
893 min_ts = _parse_datetime(sorted_result_ts[0])
894 relevant_timestamps = sorted(
895 [t for t in all_timestamps if min_ts <= _parse_datetime(t)],
896 key=_parse_datetime
897 )
898 filled = dict(result)
899 last_known = None
900 for ts in relevant_timestamps:
901 normalized = convert_to_datetime(ts, stringify=True)
902 if normalized in filled:
903 last_known = normalized
904 elif last_known is not None:
905 filled[normalized] = filled[last_known]
906 return filled
909class DeltaQuery(AgnosticQuery):
910 """
911 This class allows single time and cross-time delta structured queries.
913 :param query: A SPARQL query string. It is useful to identify the entities whose change you want to investigate.
914 :type query: str
915 :param on_time: If you want to query specific snapshots, specify the time interval here. The format is (START, END). If one of the two values is None, only the other is considered. Dates must be in ISO 8601 format.
916 :type on_time: Tuple[Union[str, None]], optional
917 :param changed_properties: A set of properties. It narrows the field to those entities where the properties specified in the set have changed.
918 :type changed_properties: Set[str], optional
919 :param config_path: The path to the configuration file.
920 :type config_path: str, optional
921 """
922 def __init__(self, query:str, on_time: tuple[str | None, str | None] | None = None, changed_properties:set[str] | None=None, config_path:str = CONFIG_PATH, config_dict: dict | None = None):
923 if changed_properties is None:
924 changed_properties = set()
925 super().__init__(query=query, on_time=on_time, config_path=config_path, config_dict=config_dict)
926 self.changed_properties = changed_properties
928 def _rebuild_relevant_graphs(self) -> None:
929 triples_checked = set()
930 self.triples = self._process_query()
931 needs_graph_alignment = False
932 for triple in self.triples:
933 if self._is_isolated(triple) and self._is_a_new_triple(triple, triples_checked):
934 present_entities = self._get_present_entities(triple)
935 self.reconstructed_entities.update(present_entities)
936 prov_entities: set = set()
937 self._find_entity_uris_in_update_queries(triple, prov_entities)
938 self.reconstructed_entities.update(prov_entities)
939 else:
940 needs_graph_alignment = True
941 self._rebuild_relevant_entity(triple[0])
942 triples_checked.add(triple)
943 if needs_graph_alignment:
944 self._align_snapshots()
945 self._solve_variables()
947 def run_agnostic_query(self) -> dict:
948 entity_uris = set(self.reconstructed_entities)
949 if not entity_uris:
950 return {}
951 prov_data = _batch_query_dm_provenance(entity_uris, self.config)
952 output = {}
953 for entity_str in entity_uris:
954 snapshots = prov_data[entity_str]
955 if not snapshots:
956 continue
957 result = _build_delta_result(
958 entity_str, snapshots,
959 self.on_time, self.changed_properties,
960 )
961 output.update(result)
962 return output
964def get_insert_query(graph_iri: str, data: set[tuple[str, ...]]) -> tuple[str, int]:
965 if not data:
966 return "", 0
967 statements = "\n".join(f"{q[0]} {q[1]} {q[2]} ." for q in data)
968 return f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {statements} }} }}", len(data)