Coverage for oc_ocdm/storer.py: 72%

242 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-05-30 22:05 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16from __future__ import annotations 

17 

18import json 

19import os 

20import time 

21from datetime import datetime 

22from typing import TYPE_CHECKING 

23from zipfile import ZIP_DEFLATED, ZipFile 

24 

25from filelock import FileLock 

26from oc_ocdm.graph.graph_entity import GraphEntity 

27from oc_ocdm.metadata.metadata_entity import MetadataEntity 

28from oc_ocdm.prov.prov_entity import ProvEntity 

29from oc_ocdm.reader import Reader 

30from oc_ocdm.support.query_utils import get_update_query 

31from oc_ocdm.support.reporter import Reporter 

32from oc_ocdm.support.support import find_paths 

33from rdflib import ConjunctiveGraph, URIRef 

34from SPARQLWrapper import SPARQLWrapper 

35 

36if TYPE_CHECKING: 

37 from typing import Any, Dict, List, Optional, Set, Tuple 

38 

39 from oc_ocdm.abstract_entity import AbstractEntity 

40 from oc_ocdm.abstract_set import AbstractSet 

41 

42 

43class Storer(object): 

44 

45 def __init__(self, abstract_set: AbstractSet, repok: Reporter = None, reperr: Reporter = None, 

46 context_map: Dict[str, Any] = None, default_dir: str = "_", dir_split: int = 0, 

47 n_file_item: int = 1, output_format: str = "json-ld", zip_output: bool = False, modified_entities: set = None) -> None: 

48 # We only accept format strings that: 

49 # 1. are supported by rdflib 

50 # 2. correspond to an output format which is effectively either NT or NQ 

51 # The only exception to this rule is the 'json-ld' format, which is the default value of 'output_format'. 

52 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11', 

53 'application/n-quads', 'nquads', 'json-ld'} 

54 if output_format not in supported_formats: 

55 raise ValueError(f"Given output_format '{self.output_format}' is not supported." 

56 f" Available formats: {supported_formats}.") 

57 else: 

58 self.output_format: str = output_format 

59 self.zip_output = zip_output 

60 self.dir_split: int = dir_split 

61 self.n_file_item: int = n_file_item 

62 self.default_dir: str = default_dir if default_dir != "" else "_" 

63 self.a_set: AbstractSet = abstract_set 

64 self.modified_entities = modified_entities 

65 

66 if context_map is not None: 

67 self.context_map: Dict[str, Any] = context_map 

68 else: 

69 self.context_map: Dict[str, Any] = {} 

70 

71 if self.output_format == "json-ld": 

72 for context_url in self.context_map: 

73 ctx_file_path: Any = self.context_map[context_url] 

74 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path): 

75 # This expensive operation is done only when it's really needed 

76 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f: 

77 self.context_map[context_url] = json.load(ctx_f) 

78 

79 if repok is None: 

80 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ") 

81 else: 

82 self.repok: Reporter = repok 

83 

84 if reperr is None: 

85 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ") 

86 else: 

87 self.reperr: Reporter = reperr 

88 

89 def store_graphs_in_file(self, file_path: str, context_path: str = None) -> None: 

90 self.repok.new_article() 

91 self.reperr.new_article() 

92 self.repok.add_sentence("Store the graphs into a file: starting process") 

93 

94 cg: ConjunctiveGraph = ConjunctiveGraph() 

95 for g in self.a_set.graphs(): 

96 cg.addN([item + (g.identifier,) for item in list(g)]) 

97 

98 self._store_in_file(cg, file_path, context_path) 

99 

100 def _store_in_file(self, cur_g: ConjunctiveGraph, cur_file_path: str, context_path: str = None) -> None: 

101 # Note: the following lines from here and until 'cur_json_ld' are a sort of hack for including all 

102 # the triples of the input graph into the final stored file. Somehow, some of them are not written 

103 # in such file otherwise - in particular the provenance ones. 

104 new_g: ConjunctiveGraph = ConjunctiveGraph() 

105 for s, p, o in cur_g.triples((None, None, None)): 

106 g_iri: Optional[URIRef] = None 

107 for g_context in cur_g.contexts((s, p, o)): 

108 g_iri = g_context.identifier 

109 break 

110 new_g.addN([(s, p, o, g_iri)]) 

111 

112 zip_file_path = cur_file_path.replace(os.path.splitext(cur_file_path)[1], ".zip") 

113 

114 if self.zip_output: 

115 with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file: 

116 self._write_graph(new_g, zip_file, cur_file_path, context_path) 

117 else: 

118 # Handle non-zipped output directly to a file 

119 self._write_graph(new_g, None, cur_file_path, context_path) 

120 

121 self.repok.add_sentence(f"File '{cur_file_path}' added.") 

122 

123 def _write_graph(self, graph: ConjunctiveGraph, zip_file: ZipFile, cur_file_path, context_path): 

124 if self.output_format == "json-ld": 

125 # Serialize the graph in JSON-LD format 

126 cur_json_ld = json.loads(graph.serialize(format="json-ld", context=self.context_map.get(context_path))) 

127 if context_path is not None and context_path in self.context_map: 

128 if isinstance(cur_json_ld, dict): 

129 cur_json_ld["@context"] = context_path 

130 else: # When cur_json_ld is a list 

131 for item in cur_json_ld: 

132 item["@context"] = context_path 

133 

134 # Determine how to write based on zip file presence 

135 if zip_file is not None: 

136 dumped_json = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8') 

137 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=dumped_json) 

138 else: 

139 with open(cur_file_path, 'wt', encoding='utf-8') as f: 

140 json.dump(cur_json_ld, f, ensure_ascii=False) 

141 else: 

142 # Handle other RDF formats 

143 if zip_file is not None: 

144 rdf_serialization = graph.serialize(destination=None, format=self.output_format, encoding="utf-8") 

145 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization) 

146 else: 

147 graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8") 

148 

149 def store_all(self, base_dir: str, base_iri: str, context_path: str = None, process_id: int|str = None) -> List[str]: 

150 self.repok.new_article() 

151 self.reperr.new_article() 

152 

153 self.repok.add_sentence("Starting the process") 

154 

155 relevant_paths: Dict[str, list] = dict() 

156 for entity in self.a_set.res_to_entity.values(): 

157 is_relevant = True 

158 if self.modified_entities is not None and URIRef(entity.res.split('/prov/se/')[0]) not in self.modified_entities: 

159 is_relevant = False 

160 if is_relevant: 

161 cur_dir_path, cur_file_path = self._dir_and_file_paths(entity.res, base_dir, base_iri, process_id) 

162 if not os.path.exists(cur_dir_path): 

163 os.makedirs(cur_dir_path) 

164 relevant_paths.setdefault(cur_file_path, list()) 

165 relevant_paths[cur_file_path].append(entity) 

166 

167 for relevant_path, entities_in_path in relevant_paths.items(): 

168 stored_g = None 

169 # Here we try to obtain a reference to the currently stored graph 

170 output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path 

171 lock = FileLock(f"{output_filepath}.lock") 

172 with lock: 

173 if os.path.exists(output_filepath): 

174 stored_g = Reader(context_map=self.context_map).load(output_filepath) 

175 if stored_g is None: 

176 stored_g = ConjunctiveGraph() 

177 for entity_in_path in entities_in_path: 

178 self.store(entity_in_path, stored_g, relevant_path, context_path, False) 

179 self._store_in_file(stored_g, relevant_path, context_path) 

180 

181 return list(relevant_paths.keys()) 

182 

183 def store(self, entity: AbstractEntity, destination_g: ConjunctiveGraph, cur_file_path: str, context_path: str = None, store_now: bool = True) -> ConjunctiveGraph: 

184 self.repok.new_article() 

185 self.reperr.new_article() 

186 

187 try: 

188 if isinstance(entity, ProvEntity): 

189 quads: List[Tuple] = [] 

190 graph_identifier: URIRef = entity.g.identifier 

191 for triple in entity.g.triples((entity.res, None, None)): 

192 quads.append((*triple, graph_identifier)) 

193 destination_g.addN(quads) 

194 elif isinstance(entity, GraphEntity) or isinstance(entity, MetadataEntity): 

195 if entity.to_be_deleted: 

196 destination_g.remove((entity.res, None, None, None)) 

197 else: 

198 if len(entity.preexisting_graph) > 0: 

199 """ 

200 We're not in 'append mode', so we need to remove 

201 the entity that we're going to overwrite. 

202 """ 

203 destination_g.remove((entity.res, None, None, None)) 

204 """ 

205 Here we copy data from the entity into the stored graph. 

206 If the entity was marked as to be deleted, then we're 

207 done because we already removed all of its triples. 

208 """ 

209 quads: List[Tuple] = [] 

210 graph_identifier: URIRef = entity.g.identifier 

211 for triple in entity.g.triples((entity.res, None, None)): 

212 quads.append((*triple, graph_identifier)) 

213 destination_g.addN(quads) 

214 

215 if store_now: 

216 self._store_in_file(destination_g, cur_file_path, context_path) 

217 

218 return destination_g 

219 except Exception as e: 

220 self.reperr.add_sentence(f"[1] It was impossible to store the RDF statements in {cur_file_path}. {e}") 

221 

222 def upload_and_store(self, base_dir: str, triplestore_url: str, base_iri: str, context_path: str = None, 

223 batch_size: int = 10) -> None: 

224 stored_graph_path: List[str] = self.store_all(base_dir, base_iri, context_path) 

225 

226 # If some graphs were not stored properly, then no one will be uploaded to the triplestore 

227 # Anyway, we should highlight those ones that could have been added in principle, by 

228 # mentioning them with a ".notuploaded" marker 

229 if None in stored_graph_path: 

230 for file_path in stored_graph_path: 

231 if file_path is not None: 

232 # Create a marker for the file not uploaded in the triplestore 

233 open(f'{file_path}.notuploaded', 'wt', encoding='utf-8').close() 

234 self.reperr.add_sentence("[2] " 

235 f"The statements contained in the JSON-LD file '{file_path}' " 

236 "were not uploaded into the triplestore.") 

237 else: # All the files have been stored 

238 self.upload_all(triplestore_url, base_dir, batch_size) 

239 

240 def _dir_and_file_paths(self, res: URIRef, base_dir: str, base_iri: str, process_id: int|str = None) -> Tuple[str, str]: 

241 is_json: bool = (self.output_format == "json-ld") 

242 return find_paths(res, base_dir, base_iri, self.default_dir, self.dir_split, self.n_file_item, is_json=is_json, process_id=process_id) 

243 

244 @staticmethod 

245 def _class_to_entity_type(entity: AbstractEntity) -> Optional[str]: 

246 if isinstance(entity, GraphEntity): 

247 return "graph" 

248 elif isinstance(entity, ProvEntity): 

249 return "prov" 

250 elif isinstance(entity, MetadataEntity): 

251 return "metadata" 

252 else: 

253 return None 

254 

255 def upload_all(self, triplestore_url: str, base_dir: str = None, batch_size: int = 10, save_queries: bool = False) -> bool: 

256 self.repok.new_article() 

257 self.reperr.new_article() 

258 

259 if batch_size <= 0: 

260 batch_size = 10 

261 

262 query_string: str = "" 

263 added_statements: int = 0 

264 removed_statements: int = 0 

265 skipped_queries: int = 0 

266 result: bool = True 

267 

268 if save_queries: 

269 to_be_uploaded_dir = os.path.join(base_dir, "to_be_uploaded") 

270 os.makedirs(to_be_uploaded_dir, exist_ok=True) 

271 

272 for idx, entity in enumerate(self.a_set.res_to_entity.values()): 

273 update_query, n_added, n_removed = get_update_query(entity, entity_type=self._class_to_entity_type(entity)) 

274 

275 if update_query == "": 

276 skipped_queries += 1 

277 else: 

278 index = idx - skipped_queries 

279 if index == 0: 

280 # First query 

281 query_string = update_query 

282 added_statements = n_added 

283 removed_statements = n_removed 

284 elif index % batch_size == 0: 

285 # batch_size-multiple query 

286 if save_queries: 

287 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements) 

288 else: 

289 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

290 query_string = update_query 

291 added_statements = n_added 

292 removed_statements = n_removed 

293 else: 

294 # Accumulated query 

295 query_string += " ; " + update_query 

296 added_statements += n_added 

297 removed_statements += n_removed 

298 

299 if query_string != "": 

300 if save_queries: 

301 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements) 

302 else: 

303 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

304 

305 return result 

306 

307 def _save_query(self, query_string: str, directory: str, added_statements: int, removed_statements: int) -> None: 

308 timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f') 

309 file_name = f"{timestamp}_add{added_statements}_remove{removed_statements}.sparql" 

310 file_path = os.path.join(directory, file_name) 

311 with open(file_path, 'w', encoding='utf-8') as f: 

312 f.write(query_string) 

313 

314 def upload(self, entity: AbstractEntity, triplestore_url: str, base_dir: str = None) -> bool: 

315 self.repok.new_article() 

316 self.reperr.new_article() 

317 

318 update_query, n_added, n_removed = get_update_query(entity, entity_type=self._class_to_entity_type(entity)) 

319 

320 return self._query(update_query, triplestore_url, base_dir, n_added, n_removed) 

321 

322 def execute_query(self, query_string: str, triplestore_url: str) -> bool: 

323 self.repok.new_article() 

324 self.reperr.new_article() 

325 

326 return self._query(query_string, triplestore_url) 

327 

328 def _query(self, query_string: str, triplestore_url: str, base_dir: str = None, 

329 added_statements: int = 0, removed_statements: int = 0) -> bool: 

330 if query_string != "": 

331 attempt = 0 

332 max_attempts = 3 

333 wait_time = 5 # Initial wait time in seconds 

334 

335 while attempt < max_attempts: 

336 try: 

337 sparql: SPARQLWrapper = SPARQLWrapper(triplestore_url) 

338 sparql.setQuery(query_string) 

339 sparql.setMethod('POST') 

340 

341 sparql.query() 

342 

343 self.repok.add_sentence( 

344 f"Triplestore updated with {added_statements} added statements and " 

345 f"with {removed_statements} removed statements.") 

346 

347 return True 

348 

349 except Exception as e: 

350 attempt += 1 

351 self.reperr.add_sentence("[3] " 

352 f"Attempt {attempt} failed. Graph was not loaded into the " 

353 f"triplestore due to communication problems: {e}") 

354 if attempt < max_attempts: 

355 self.reperr.add_sentence(f"Retrying in {wait_time} seconds...") 

356 time.sleep(wait_time) 

357 wait_time *= 2 # Double the wait time for the next attempt 

358 

359 if base_dir is not None and attempt == max_attempts: 

360 self.reperr.add_sentence("[3] " 

361 "Graph was not loaded into the " 

362 f"triplestore due to communication problems: {e}") 

363 tp_err_dir: str = base_dir + os.sep + "tp_err" 

364 if not os.path.exists(tp_err_dir): 

365 os.makedirs(tp_err_dir) 

366 cur_file_err: str = tp_err_dir + os.sep + \ 

367 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt') 

368 with open(cur_file_err, 'wt', encoding='utf-8') as f: 

369 f.write(query_string) 

370 

371 return False