Coverage for src / time_agnostic_library / sparql.py: 100%
180 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-03-21 11:54 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-03-21 11:54 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
8import atexit
9import threading
10import zipfile
12from rdflib import Dataset
13from rdflib.term import Literal, URIRef
14from sparqlite import SPARQLClient
16from time_agnostic_library.prov_entity import ProvEntity
18__all__ = [
19 "Sparql",
20 "_binding_to_n3",
21 "_n3_to_binding",
22 "_n3_value",
23]
25CONFIG_PATH = "./config.json"
27_PROV_PROPERTY_STRINGS: tuple[str, ...] = tuple(ProvEntity.get_prov_properties())
29_client_cache: dict[tuple[str, int], SPARQLClient] = {}
30_client_lock = threading.Lock()
33def _get_client(url: str) -> SPARQLClient:
34 key = (url, threading.get_ident())
35 with _client_lock:
36 client = _client_cache.get(key)
37 if client is None:
38 client = SPARQLClient(url)
39 _client_cache[key] = client
40 return client
43def _close_all_clients() -> None:
44 with _client_lock:
45 for client in _client_cache.values():
46 client.close()
47 _client_cache.clear()
50atexit.register(_close_all_clients)
53def _escape_n3(v: str) -> str:
54 return v.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r')
57def _binding_to_n3(val: dict) -> str:
58 if val['type'] == 'uri':
59 return f"<{val['value']}>"
60 if val['type'] == 'bnode':
61 return f"_:{val['value']}"
62 escaped = _escape_n3(val['value'])
63 if 'datatype' in val:
64 return f'"{escaped}"^^<{val["datatype"]}>'
65 if 'xml:lang' in val:
66 return f'"{escaped}"@{val["xml:lang"]}'
67 return f'"{escaped}"'
70def _find_closing_quote(n3: str) -> int:
71 pos = n3.find('"', 1)
72 while pos > 0:
73 num_backslashes = 0
74 check = pos - 1
75 while check >= 1 and n3[check] == '\\':
76 num_backslashes += 1
77 check -= 1
78 if num_backslashes % 2 == 0:
79 return pos
80 pos = n3.find('"', pos + 1)
81 return -1
84def _unescape_n3(raw: str) -> str:
85 out: list[str] = []
86 i = 0
87 while i < len(raw):
88 if raw[i] == '\\' and i + 1 < len(raw):
89 nxt = raw[i + 1]
90 if nxt == 'n':
91 out.append('\n')
92 elif nxt == 'r':
93 out.append('\r')
94 elif nxt == '"':
95 out.append('"')
96 elif nxt == '\\':
97 out.append('\\')
98 else:
99 out.append(raw[i])
100 out.append(nxt)
101 i += 2
102 else:
103 out.append(raw[i])
104 i += 1
105 return ''.join(out)
108def _parse_n3_literal(n3: str) -> tuple[str, str]:
109 quote_end = _find_closing_quote(n3)
110 if quote_end == -1:
111 return n3, ''
112 raw = n3[1:quote_end]
113 return _unescape_n3(raw), n3[quote_end + 1:]
116def _n3_value(n3: str) -> str:
117 if n3.startswith('<') and n3.endswith('>'):
118 return n3[1:-1]
119 if n3.startswith('_:'):
120 return n3[2:]
121 value, _ = _parse_n3_literal(n3)
122 return value
125def _n3_to_binding(n3: str) -> dict:
126 if n3.startswith('<') and n3.endswith('>'):
127 return {'type': 'uri', 'value': n3[1:-1]}
128 if n3.startswith('_:'):
129 return {'type': 'bnode', 'value': n3[2:]}
130 value, rest = _parse_n3_literal(n3)
131 if rest.startswith('^^<') and rest.endswith('>'):
132 return {'type': 'literal', 'value': value, 'datatype': rest[3:-1]}
133 if rest.startswith('@'):
134 return {'type': 'literal', 'value': value, 'xml:lang': rest[1:]}
135 return {'type': 'literal', 'value': value}
138class Sparql:
139 def __init__(self, query:str, config:dict):
140 self.query = query
141 self.config = config
142 if any(uri in query for uri in _PROV_PROPERTY_STRINGS):
143 self.storer:dict = config["provenance"]
144 else:
145 self.storer:dict = config["dataset"]
147 def run_select_query(self) -> dict:
148 output = {'head': {'vars': []}, 'results': {'bindings': []}}
149 if self.storer["file_paths"]:
150 output = self._get_results_from_files(output)
151 if self.storer["triplestore_urls"]:
152 output = self._get_results_from_triplestores(output)
153 return output
155 def _get_results_from_files(self, output: dict) -> dict:
156 storer: list[str] = self.storer["file_paths"]
157 for file_path in storer:
158 file_cg = Dataset(default_union=True)
159 if file_path.endswith('.zip'):
160 with zipfile.ZipFile(file_path, 'r') as z, z.open(z.namelist()[0]) as file:
161 file_cg.parse(file=file, format="json-ld") # type: ignore[arg-type]
162 else:
163 file_cg.parse(location=file_path, format="json-ld")
164 query_results = file_cg.query(self.query)
165 assert query_results.vars is not None
166 vars_list = [str(var) for var in query_results.vars]
167 output['head']['vars'] = vars_list
168 for result in query_results:
169 binding = {}
170 for var in vars_list:
171 value = result[var] # type: ignore[index]
172 if value is not None:
173 binding[var] = self._format_result_value(value)
174 output['results']['bindings'].append(binding)
175 return output
177 def _get_results_from_triplestores(self, output: dict) -> dict:
178 storer = self.storer["triplestore_urls"]
179 for url in storer:
180 results = _get_client(url).query(self.query)
181 if not output['head']['vars']:
182 output['head']['vars'] = results['head']['vars']
183 output['results']['bindings'].extend(results['results']['bindings'])
184 return output
186 @staticmethod
187 def _format_result_value(value) -> dict:
188 if isinstance(value, URIRef):
189 return {'type': 'uri', 'value': str(value)}
190 elif isinstance(value, Literal):
191 result = {'type': 'literal', 'value': str(value)}
192 if value.datatype:
193 result['datatype'] = str(value.datatype)
194 if value.language:
195 result['xml:lang'] = value.language
196 return result
197 else:
198 return {'type': 'literal', 'value': str(value)}
200 def run_select_to_quad_set(self) -> set[tuple[str, ...]]:
201 results = self.run_select_query()
202 output: set[tuple[str, ...]] = set()
203 vars_list = results['head']['vars']
204 for binding in results['results']['bindings']:
205 components: list[str] = []
206 skip = False
207 for var in vars_list:
208 if var not in binding:
209 skip = True
210 break
211 components.append(_binding_to_n3(binding[var]))
212 if not skip:
213 output.add(tuple(components))
214 return output
216 def run_ask_query(self) -> bool:
217 storer = self.storer["triplestore_urls"]
218 for url in storer:
219 return _get_client(url).ask(self.query)
220 return False
222 @classmethod
223 def _get_tuples_set(cls, result_dict:dict, output:set, vars_list: list) -> None:
224 results_list = []
225 for var in vars_list:
226 if str(var) in result_dict:
227 val = result_dict[str(var)]
228 if isinstance(val, dict) and "value" in val:
229 results_list.append(str(val["value"]))
230 else:
231 results_list.append(str(val))
232 else:
233 results_list.append(None)
234 output.add(tuple(results_list))