Coverage for oc_ocdm / storer.py: 85%

228 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-28 18:52 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com> 

4# SPDX-FileCopyrightText: 2022-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8# -*- coding: utf-8 -*- 

9from __future__ import annotations 

10 

11import hashlib 

12import json 

13import os 

14from datetime import datetime 

15from typing import TYPE_CHECKING 

16from zipfile import ZIP_DEFLATED, ZipFile 

17 

18from filelock import FileLock 

19 

20from oc_ocdm.graph.graph_entity import GraphEntity 

21from oc_ocdm.metadata.metadata_entity import MetadataEntity 

22from oc_ocdm.prov.prov_entity import ProvEntity 

23from oc_ocdm.reader import Reader 

24from oc_ocdm.support.query_utils import get_update_query 

25from oc_ocdm.support.reporter import Reporter 

26from oc_ocdm.support.support import find_paths 

27from rdflib import Dataset, URIRef 

28from sparqlite import SPARQLClient, EndpointError 

29 

30if TYPE_CHECKING: 

31 from typing import Any, Dict, List, Set, Tuple 

32 

33 from oc_ocdm.abstract_entity import AbstractEntity 

34 from oc_ocdm.abstract_set import AbstractSet 

35 

36 

37class Storer(object): 

38 

39 def __init__(self, abstract_set: AbstractSet, repok: Reporter | None = None, reperr: Reporter | None = None, 

40 context_map: Dict[str, Any] | None = None, default_dir: str = "_", dir_split: int = 0, 

41 n_file_item: int = 1, output_format: str = "json-ld", zip_output: bool = False, modified_entities: set | None = None) -> None: 

42 # We only accept format strings that: 

43 # 1. are supported by rdflib 

44 # 2. correspond to an output format which is effectively either NT or NQ 

45 # The only exception to this rule is the 'json-ld' format, which is the default value of 'output_format'. 

46 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11', 

47 'application/n-quads', 'nquads', 'json-ld'} 

48 if output_format not in supported_formats: 

49 raise ValueError(f"Given output_format '{output_format}' is not supported." 

50 f" Available formats: {supported_formats}.") 

51 else: 

52 self.output_format: str = output_format 

53 self.zip_output = zip_output 

54 self.dir_split: int = dir_split 

55 self.n_file_item: int = n_file_item 

56 self.default_dir: str = default_dir if default_dir != "" else "_" 

57 self.a_set: AbstractSet = abstract_set 

58 self.modified_entities = modified_entities 

59 

60 if context_map is not None: 

61 self.context_map: Dict[str, Any] = context_map 

62 else: 

63 self.context_map: Dict[str, Any] = {} 

64 

65 if self.output_format == "json-ld": 

66 for context_url in self.context_map: 

67 ctx_file_path: Any = self.context_map[context_url] 

68 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path): 

69 # This expensive operation is done only when it's really needed 

70 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f: 

71 self.context_map[context_url] = json.load(ctx_f) 

72 

73 if repok is None: 

74 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ") 

75 else: 

76 self.repok: Reporter = repok 

77 

78 if reperr is None: 

79 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ") 

80 else: 

81 self.reperr: Reporter = reperr 

82 

83 def store_graphs_in_file(self, file_path: str, context_path: str | None = None) -> None: 

84 self.repok.new_article() 

85 self.reperr.new_article() 

86 self.repok.add_sentence("Store the graphs into a file: starting process") 

87 

88 cg: Dataset = Dataset() 

89 for g in self.a_set.graphs(): 

90 cg.addN(item + (g.identifier,) for item in g) # type: ignore[arg-type] 

91 

92 self._store_in_file(cg, file_path, context_path) 

93 

94 def _store_in_file(self, cur_g: Dataset, cur_file_path: str, context_path: str | None = None) -> None: 

95 zip_file_path = cur_file_path.replace(os.path.splitext(cur_file_path)[1], ".zip") 

96 

97 if self.zip_output: 

98 with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file: 

99 self._write_graph(cur_g, zip_file, cur_file_path, context_path) 

100 else: 

101 self._write_graph(cur_g, None, cur_file_path, context_path) 

102 

103 self.repok.add_sentence(f"File '{cur_file_path}' added.") 

104 

105 def _write_graph(self, graph: Dataset, zip_file: ZipFile | None, cur_file_path: str, context_path: str | None) -> None: 

106 if self.output_format == "json-ld": 

107 if context_path is not None and context_path in self.context_map: 

108 cur_json_ld = json.loads(graph.serialize(format="json-ld", context=self.context_map[context_path])) 

109 if isinstance(cur_json_ld, dict): 

110 cur_json_ld["@context"] = context_path 

111 else: 

112 for item in cur_json_ld: 

113 item["@context"] = context_path 

114 if zip_file is not None: 

115 data = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8') 

116 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data) 

117 else: 

118 with open(cur_file_path, 'wt', encoding='utf-8') as f: 

119 json.dump(cur_json_ld, f, ensure_ascii=False) 

120 else: 

121 if zip_file is not None: 

122 data = graph.serialize(format="json-ld").encode('utf-8') 

123 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data) 

124 else: 

125 graph.serialize(destination=cur_file_path, format="json-ld") 

126 else: 

127 # Handle other RDF formats 

128 if zip_file is not None: 

129 rdf_serialization = graph.serialize(destination=None, format=self.output_format, encoding="utf-8") 

130 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization) 

131 else: 

132 graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8") 

133 

134 def store_all(self, base_dir: str, base_iri: str, context_path: str | None = None, process_id: int | str | None = None) -> List[str]: 

135 self.repok.new_article() 

136 self.reperr.new_article() 

137 

138 self.repok.add_sentence("Starting the process") 

139 

140 relevant_paths: Dict[str, list] = dict() 

141 created_dirs = set() 

142 for entity in self.a_set.res_to_entity.values(): 

143 is_relevant = True 

144 if self.modified_entities is not None and URIRef(entity.res.split('/prov/se/')[0]) not in self.modified_entities: 

145 is_relevant = False 

146 if is_relevant: 

147 cur_dir_path, cur_file_path = self._dir_and_file_paths(entity.res, base_dir, base_iri, process_id) 

148 if cur_dir_path not in created_dirs: 

149 os.makedirs(cur_dir_path, exist_ok=True) 

150 created_dirs.add(cur_dir_path) 

151 relevant_paths.setdefault(cur_file_path, list()) 

152 relevant_paths[cur_file_path].append(entity) 

153 

154 for relevant_path, entities_in_path in relevant_paths.items(): 

155 stored_g = None 

156 # Here we try to obtain a reference to the currently stored graph 

157 output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path 

158 lock = FileLock(f"{output_filepath}.lock") 

159 with lock: 

160 if os.path.exists(output_filepath): 

161 stored_g = Reader(context_map=self.context_map).load(output_filepath) 

162 if stored_g is None: 

163 stored_g = Dataset() 

164 for entity_in_path in entities_in_path: 

165 self.store(entity_in_path, stored_g, relevant_path, context_path, False) 

166 self._store_in_file(stored_g, relevant_path, context_path) 

167 

168 return list(relevant_paths.keys()) 

169 

170 def store(self, entity: AbstractEntity, destination_g: Dataset, cur_file_path: str, context_path: str | None = None, store_now: bool = True) -> Dataset | None: 

171 self.repok.new_article() 

172 self.reperr.new_article() 

173 

174 try: 

175 if isinstance(entity, ProvEntity): 

176 quads: List[Tuple] = [] 

177 graph_identifier = URIRef(str(entity.g.identifier)) 

178 for triple in entity.g.triples((entity.res, None, None)): 

179 quads.append((*triple, graph_identifier)) 

180 destination_g.addN(quads) 

181 elif isinstance(entity, GraphEntity) or isinstance(entity, MetadataEntity): 

182 if entity.to_be_deleted: 

183 destination_g.remove((entity.res, None, None, None)) # type: ignore[arg-type] 

184 else: 

185 if len(entity.preexisting_graph) > 0: 

186 """ 

187 We're not in 'append mode', so we need to remove 

188 the entity that we're going to overwrite. 

189 """ 

190 destination_g.remove((entity.res, None, None, None)) # type: ignore[arg-type] 

191 """ 

192 Here we copy data from the entity into the stored graph. 

193 If the entity was marked as to be deleted, then we're 

194 done because we already removed all of its triples. 

195 """ 

196 quads: List[Tuple] = [] 

197 graph_identifier = URIRef(str(entity.g.identifier)) 

198 for triple in entity.g.triples((entity.res, None, None)): 

199 quads.append((*triple, graph_identifier)) 

200 destination_g.addN(quads) 

201 

202 if store_now: 

203 self._store_in_file(destination_g, cur_file_path, context_path) 

204 

205 return destination_g 

206 except Exception as e: 

207 self.reperr.add_sentence(f"[1] It was impossible to store the RDF statements in {cur_file_path}. {e}") 

208 

209 def upload_and_store(self, base_dir: str, triplestore_url: str, base_iri: str, context_path: str | None = None, 

210 batch_size: int = 10) -> None: 

211 stored_graph_path: List[str] = self.store_all(base_dir, base_iri, context_path) 

212 

213 # If some graphs were not stored properly, then no one will be uploaded to the triplestore 

214 # Anyway, we should highlight those ones that could have been added in principle, by 

215 # mentioning them with a ".notuploaded" marker 

216 if None in stored_graph_path: 

217 for file_path in stored_graph_path: 

218 if file_path is not None: 

219 # Create a marker for the file not uploaded in the triplestore 

220 open(f'{file_path}.notuploaded', 'wt', encoding='utf-8').close() 

221 self.reperr.add_sentence("[2] " 

222 f"The statements contained in the JSON-LD file '{file_path}' " 

223 "were not uploaded into the triplestore.") 

224 else: # All the files have been stored 

225 self.upload_all(triplestore_url, base_dir, batch_size) 

226 

227 def _dir_and_file_paths(self, res: URIRef, base_dir: str, base_iri: str, process_id: int | str | None = None) -> Tuple[str, str]: 

228 is_json: bool = (self.output_format == "json-ld") 

229 return find_paths(res, base_dir, base_iri, self.default_dir, self.dir_split, self.n_file_item, is_json=is_json, process_id=process_id) 

230 

231 @staticmethod 

232 def _class_to_entity_type(entity: AbstractEntity) -> str: 

233 if isinstance(entity, GraphEntity): 

234 return "graph" 

235 elif isinstance(entity, ProvEntity): 

236 return "prov" 

237 else: 

238 return "metadata" 

239 

240 def upload_all(self, triplestore_url: str, base_dir: str | None = None, batch_size: int = 10, 

241 save_queries: bool = False) -> bool: 

242 """ 

243 Upload SPARQL update queries to the triplestore in batches, or save them to disk. 

244 

245 Args: 

246 triplestore_url: SPARQL endpoint URL 

247 base_dir: Base directory for output files (required when save_queries is True) 

248 batch_size: Number of queries per SPARQL batch 

249 save_queries: If True, save combined SPARQL queries to disk instead of uploading 

250 

251 Returns: 

252 True if all batches were processed successfully, False otherwise 

253 """ 

254 self.repok.new_article() 

255 self.reperr.new_article() 

256 

257 if batch_size <= 0: 

258 batch_size = 10 

259 

260 query_batch: list = [] 

261 added_statements: int = 0 

262 removed_statements: int = 0 

263 result: bool = True 

264 to_be_uploaded_dir: str = "" 

265 

266 if base_dir: 

267 to_be_uploaded_dir = os.path.join(base_dir, "to_be_uploaded") 

268 os.makedirs(to_be_uploaded_dir, exist_ok=True) 

269 

270 entities_to_process = self.a_set.res_to_entity.values() 

271 if self.modified_entities is not None: 

272 entities_to_process = [ 

273 entity for entity in entities_to_process 

274 if URIRef(str(entity.res).split('/prov/se/')[0]) in self.modified_entities 

275 ] 

276 

277 for entity in entities_to_process: 

278 entity_type = self._class_to_entity_type(entity) 

279 update_queries, n_added, n_removed = get_update_query(entity, entity_type=entity_type) 

280 

281 if not update_queries: 

282 continue 

283 

284 for query in update_queries: 

285 query_batch.append(query) 

286 added_statements += n_added // len(update_queries) 

287 removed_statements += n_removed // len(update_queries) 

288 

289 if len(query_batch) >= batch_size: 

290 query_string = " ; ".join(query_batch) 

291 if save_queries: 

292 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements) 

293 else: 

294 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

295 query_batch = [] 

296 added_statements = 0 

297 removed_statements = 0 

298 

299 if query_batch: 

300 query_string = " ; ".join(query_batch) 

301 if save_queries: 

302 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements) 

303 else: 

304 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

305 

306 return result 

307 

308 def _save_query(self, query_string: str, directory: str, added_statements: int, removed_statements: int) -> None: 

309 content_hash = hashlib.sha256(query_string.encode('utf-8')).hexdigest()[:16] 

310 file_name = f"{content_hash}_add{added_statements}_remove{removed_statements}.sparql" 

311 file_path = os.path.join(directory, file_name) 

312 with open(file_path, 'w', encoding='utf-8') as f: 

313 f.write(query_string) 

314 

315 def upload(self, entity: AbstractEntity, triplestore_url: str, base_dir: str | None = None) -> bool: 

316 self.repok.new_article() 

317 self.reperr.new_article() 

318 

319 entity_type = self._class_to_entity_type(entity) 

320 update_queries, n_added, n_removed = get_update_query(entity, entity_type=entity_type) 

321 query_string = " ; ".join(update_queries) if update_queries else "" 

322 return self._query(query_string, triplestore_url, base_dir, n_added, n_removed) 

323 

324 def execute_query(self, query_string: str, triplestore_url: str) -> bool: 

325 self.repok.new_article() 

326 self.reperr.new_article() 

327 

328 return self._query(query_string, triplestore_url) 

329 

330 def _query(self, query_string: str, triplestore_url: str, base_dir: str | None = None, 

331 added_statements: int = 0, removed_statements: int = 0) -> bool: 

332 if query_string != "": 

333 try: 

334 with SPARQLClient(triplestore_url, max_retries=3, backoff_factor=2.5) as client: 

335 client.update(query_string) 

336 

337 self.repok.add_sentence( 

338 f"Triplestore updated with {added_statements} added statements and " 

339 f"with {removed_statements} removed statements.") 

340 

341 return True 

342 

343 except EndpointError as e: 

344 self.reperr.add_sentence("[3] " 

345 "Graph was not loaded into the " 

346 f"triplestore due to communication problems: {e}") 

347 if base_dir is not None: 

348 tp_err_dir: str = base_dir + os.sep + "tp_err" 

349 if not os.path.exists(tp_err_dir): 

350 os.makedirs(tp_err_dir, exist_ok=True) 

351 cur_file_err: str = tp_err_dir + os.sep + \ 

352 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt') 

353 with open(cur_file_err, 'wt', encoding='utf-8') as f: 

354 f.write(query_string) 

355 

356 return False