Coverage for src / time_agnostic_library / sparql.py: 100%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-03-21 11:54 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7 

8import atexit 

9import threading 

10import zipfile 

11 

12from rdflib import Dataset 

13from rdflib.term import Literal, URIRef 

14from sparqlite import SPARQLClient 

15 

16from time_agnostic_library.prov_entity import ProvEntity 

17 

18__all__ = [ 

19 "Sparql", 

20 "_binding_to_n3", 

21 "_n3_to_binding", 

22 "_n3_value", 

23] 

24 

25CONFIG_PATH = "./config.json" 

26 

27_PROV_PROPERTY_STRINGS: tuple[str, ...] = tuple(ProvEntity.get_prov_properties()) 

28 

29_client_cache: dict[tuple[str, int], SPARQLClient] = {} 

30_client_lock = threading.Lock() 

31 

32 

33def _get_client(url: str) -> SPARQLClient: 

34 key = (url, threading.get_ident()) 

35 with _client_lock: 

36 client = _client_cache.get(key) 

37 if client is None: 

38 client = SPARQLClient(url) 

39 _client_cache[key] = client 

40 return client 

41 

42 

43def _close_all_clients() -> None: 

44 with _client_lock: 

45 for client in _client_cache.values(): 

46 client.close() 

47 _client_cache.clear() 

48 

49 

50atexit.register(_close_all_clients) 

51 

52 

53def _escape_n3(v: str) -> str: 

54 return v.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r') 

55 

56 

57def _binding_to_n3(val: dict) -> str: 

58 if val['type'] == 'uri': 

59 return f"<{val['value']}>" 

60 if val['type'] == 'bnode': 

61 return f"_:{val['value']}" 

62 escaped = _escape_n3(val['value']) 

63 if 'datatype' in val: 

64 return f'"{escaped}"^^<{val["datatype"]}>' 

65 if 'xml:lang' in val: 

66 return f'"{escaped}"@{val["xml:lang"]}' 

67 return f'"{escaped}"' 

68 

69 

70def _find_closing_quote(n3: str) -> int: 

71 pos = n3.find('"', 1) 

72 while pos > 0: 

73 num_backslashes = 0 

74 check = pos - 1 

75 while check >= 1 and n3[check] == '\\': 

76 num_backslashes += 1 

77 check -= 1 

78 if num_backslashes % 2 == 0: 

79 return pos 

80 pos = n3.find('"', pos + 1) 

81 return -1 

82 

83 

84def _unescape_n3(raw: str) -> str: 

85 out: list[str] = [] 

86 i = 0 

87 while i < len(raw): 

88 if raw[i] == '\\' and i + 1 < len(raw): 

89 nxt = raw[i + 1] 

90 if nxt == 'n': 

91 out.append('\n') 

92 elif nxt == 'r': 

93 out.append('\r') 

94 elif nxt == '"': 

95 out.append('"') 

96 elif nxt == '\\': 

97 out.append('\\') 

98 else: 

99 out.append(raw[i]) 

100 out.append(nxt) 

101 i += 2 

102 else: 

103 out.append(raw[i]) 

104 i += 1 

105 return ''.join(out) 

106 

107 

108def _parse_n3_literal(n3: str) -> tuple[str, str]: 

109 quote_end = _find_closing_quote(n3) 

110 if quote_end == -1: 

111 return n3, '' 

112 raw = n3[1:quote_end] 

113 return _unescape_n3(raw), n3[quote_end + 1:] 

114 

115 

116def _n3_value(n3: str) -> str: 

117 if n3.startswith('<') and n3.endswith('>'): 

118 return n3[1:-1] 

119 if n3.startswith('_:'): 

120 return n3[2:] 

121 value, _ = _parse_n3_literal(n3) 

122 return value 

123 

124 

125def _n3_to_binding(n3: str) -> dict: 

126 if n3.startswith('<') and n3.endswith('>'): 

127 return {'type': 'uri', 'value': n3[1:-1]} 

128 if n3.startswith('_:'): 

129 return {'type': 'bnode', 'value': n3[2:]} 

130 value, rest = _parse_n3_literal(n3) 

131 if rest.startswith('^^<') and rest.endswith('>'): 

132 return {'type': 'literal', 'value': value, 'datatype': rest[3:-1]} 

133 if rest.startswith('@'): 

134 return {'type': 'literal', 'value': value, 'xml:lang': rest[1:]} 

135 return {'type': 'literal', 'value': value} 

136 

137 

138class Sparql: 

139 def __init__(self, query:str, config:dict): 

140 self.query = query 

141 self.config = config 

142 if any(uri in query for uri in _PROV_PROPERTY_STRINGS): 

143 self.storer:dict = config["provenance"] 

144 else: 

145 self.storer:dict = config["dataset"] 

146 

147 def run_select_query(self) -> dict: 

148 output = {'head': {'vars': []}, 'results': {'bindings': []}} 

149 if self.storer["file_paths"]: 

150 output = self._get_results_from_files(output) 

151 if self.storer["triplestore_urls"]: 

152 output = self._get_results_from_triplestores(output) 

153 return output 

154 

155 def _get_results_from_files(self, output: dict) -> dict: 

156 storer: list[str] = self.storer["file_paths"] 

157 for file_path in storer: 

158 file_cg = Dataset(default_union=True) 

159 if file_path.endswith('.zip'): 

160 with zipfile.ZipFile(file_path, 'r') as z, z.open(z.namelist()[0]) as file: 

161 file_cg.parse(file=file, format="json-ld") # type: ignore[arg-type] 

162 else: 

163 file_cg.parse(location=file_path, format="json-ld") 

164 query_results = file_cg.query(self.query) 

165 assert query_results.vars is not None 

166 vars_list = [str(var) for var in query_results.vars] 

167 output['head']['vars'] = vars_list 

168 for result in query_results: 

169 binding = {} 

170 for var in vars_list: 

171 value = result[var] # type: ignore[index] 

172 if value is not None: 

173 binding[var] = self._format_result_value(value) 

174 output['results']['bindings'].append(binding) 

175 return output 

176 

177 def _get_results_from_triplestores(self, output: dict) -> dict: 

178 storer = self.storer["triplestore_urls"] 

179 for url in storer: 

180 results = _get_client(url).query(self.query) 

181 if not output['head']['vars']: 

182 output['head']['vars'] = results['head']['vars'] 

183 output['results']['bindings'].extend(results['results']['bindings']) 

184 return output 

185 

186 @staticmethod 

187 def _format_result_value(value) -> dict: 

188 if isinstance(value, URIRef): 

189 return {'type': 'uri', 'value': str(value)} 

190 elif isinstance(value, Literal): 

191 result = {'type': 'literal', 'value': str(value)} 

192 if value.datatype: 

193 result['datatype'] = str(value.datatype) 

194 if value.language: 

195 result['xml:lang'] = value.language 

196 return result 

197 else: 

198 return {'type': 'literal', 'value': str(value)} 

199 

200 def run_select_to_quad_set(self) -> set[tuple[str, ...]]: 

201 results = self.run_select_query() 

202 output: set[tuple[str, ...]] = set() 

203 vars_list = results['head']['vars'] 

204 for binding in results['results']['bindings']: 

205 components: list[str] = [] 

206 skip = False 

207 for var in vars_list: 

208 if var not in binding: 

209 skip = True 

210 break 

211 components.append(_binding_to_n3(binding[var])) 

212 if not skip: 

213 output.add(tuple(components)) 

214 return output 

215 

216 def run_ask_query(self) -> bool: 

217 storer = self.storer["triplestore_urls"] 

218 for url in storer: 

219 return _get_client(url).ask(self.query) 

220 return False 

221 

222 @classmethod 

223 def _get_tuples_set(cls, result_dict:dict, output:set, vars_list: list) -> None: 

224 results_list = [] 

225 for var in vars_list: 

226 if str(var) in result_dict: 

227 val = result_dict[str(var)] 

228 if isinstance(val, dict) and "value" in val: 

229 results_list.append(str(val["value"])) 

230 else: 

231 results_list.append(str(val)) 

232 else: 

233 results_list.append(None) 

234 output.add(tuple(results_list))