Coverage for oc_ocdm/storer.py: 82%

272 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-05 23:58 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16from __future__ import annotations 

17 

18import gzip 

19import hashlib 

20import json 

21import os 

22from datetime import datetime 

23from typing import TYPE_CHECKING 

24from zipfile import ZIP_DEFLATED, ZipFile 

25 

26from filelock import FileLock 

27 

28from oc_ocdm.graph.graph_entity import GraphEntity 

29from oc_ocdm.metadata.metadata_entity import MetadataEntity 

30from oc_ocdm.prov.prov_entity import ProvEntity 

31from oc_ocdm.reader import Reader 

32from oc_ocdm.support.query_utils import get_separated_queries, get_update_query, serialize_graph_to_nquads 

33from oc_ocdm.support.reporter import Reporter 

34from oc_ocdm.support.support import find_paths 

35from rdflib import Dataset, URIRef 

36from sparqlite import SPARQLClient, EndpointError 

37 

38if TYPE_CHECKING: 

39 from typing import Any, Dict, List, Optional, Set, Tuple 

40 

41 from oc_ocdm.abstract_entity import AbstractEntity 

42 from oc_ocdm.abstract_set import AbstractSet 

43 

44 

45class Storer(object): 

46 

47 def __init__(self, abstract_set: AbstractSet, repok: Reporter = None, reperr: Reporter = None, 

48 context_map: Dict[str, Any] = None, default_dir: str = "_", dir_split: int = 0, 

49 n_file_item: int = 1, output_format: str = "json-ld", zip_output: bool = False, modified_entities: set = None) -> None: 

50 # We only accept format strings that: 

51 # 1. are supported by rdflib 

52 # 2. correspond to an output format which is effectively either NT or NQ 

53 # The only exception to this rule is the 'json-ld' format, which is the default value of 'output_format'. 

54 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11', 

55 'application/n-quads', 'nquads', 'json-ld'} 

56 if output_format not in supported_formats: 

57 raise ValueError(f"Given output_format '{output_format}' is not supported." 

58 f" Available formats: {supported_formats}.") 

59 else: 

60 self.output_format: str = output_format 

61 self.zip_output = zip_output 

62 self.dir_split: int = dir_split 

63 self.n_file_item: int = n_file_item 

64 self.default_dir: str = default_dir if default_dir != "" else "_" 

65 self.a_set: AbstractSet = abstract_set 

66 self.modified_entities = modified_entities 

67 

68 if context_map is not None: 

69 self.context_map: Dict[str, Any] = context_map 

70 else: 

71 self.context_map: Dict[str, Any] = {} 

72 

73 if self.output_format == "json-ld": 

74 for context_url in self.context_map: 

75 ctx_file_path: Any = self.context_map[context_url] 

76 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path): 

77 # This expensive operation is done only when it's really needed 

78 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f: 

79 self.context_map[context_url] = json.load(ctx_f) 

80 

81 if repok is None: 

82 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ") 

83 else: 

84 self.repok: Reporter = repok 

85 

86 if reperr is None: 

87 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ") 

88 else: 

89 self.reperr: Reporter = reperr 

90 

91 def store_graphs_in_file(self, file_path: str, context_path: str = None) -> None: 

92 self.repok.new_article() 

93 self.reperr.new_article() 

94 self.repok.add_sentence("Store the graphs into a file: starting process") 

95 

96 cg: Dataset = Dataset() 

97 for g in self.a_set.graphs(): 

98 cg.addN(item + (g.identifier,) for item in g) 

99 

100 self._store_in_file(cg, file_path, context_path) 

101 

102 def _store_in_file(self, cur_g: Dataset, cur_file_path: str, context_path: str = None) -> None: 

103 zip_file_path = cur_file_path.replace(os.path.splitext(cur_file_path)[1], ".zip") 

104 

105 if self.zip_output: 

106 with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file: 

107 self._write_graph(cur_g, zip_file, cur_file_path, context_path) 

108 else: 

109 self._write_graph(cur_g, None, cur_file_path, context_path) 

110 

111 self.repok.add_sentence(f"File '{cur_file_path}' added.") 

112 

113 def _write_graph(self, graph: Dataset, zip_file: ZipFile, cur_file_path, context_path): 

114 if self.output_format == "json-ld": 

115 if context_path is not None and context_path in self.context_map: 

116 cur_json_ld = json.loads(graph.serialize(format="json-ld", context=self.context_map[context_path])) 

117 if isinstance(cur_json_ld, dict): 

118 cur_json_ld["@context"] = context_path 

119 else: 

120 for item in cur_json_ld: 

121 item["@context"] = context_path 

122 if zip_file is not None: 

123 data = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8') 

124 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data) 

125 else: 

126 with open(cur_file_path, 'wt', encoding='utf-8') as f: 

127 json.dump(cur_json_ld, f, ensure_ascii=False) 

128 else: 

129 if zip_file is not None: 

130 data = graph.serialize(format="json-ld").encode('utf-8') 

131 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data) 

132 else: 

133 graph.serialize(destination=cur_file_path, format="json-ld") 

134 else: 

135 # Handle other RDF formats 

136 if zip_file is not None: 

137 rdf_serialization = graph.serialize(destination=None, format=self.output_format, encoding="utf-8") 

138 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization) 

139 else: 

140 graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8") 

141 

142 def store_all(self, base_dir: str, base_iri: str, context_path: str = None, process_id: int|str = None) -> List[str]: 

143 self.repok.new_article() 

144 self.reperr.new_article() 

145 

146 self.repok.add_sentence("Starting the process") 

147 

148 relevant_paths: Dict[str, list] = dict() 

149 created_dirs = set() 

150 for entity in self.a_set.res_to_entity.values(): 

151 is_relevant = True 

152 if self.modified_entities is not None and URIRef(entity.res.split('/prov/se/')[0]) not in self.modified_entities: 

153 is_relevant = False 

154 if is_relevant: 

155 cur_dir_path, cur_file_path = self._dir_and_file_paths(entity.res, base_dir, base_iri, process_id) 

156 if cur_dir_path not in created_dirs: 

157 os.makedirs(cur_dir_path, exist_ok=True) 

158 created_dirs.add(cur_dir_path) 

159 relevant_paths.setdefault(cur_file_path, list()) 

160 relevant_paths[cur_file_path].append(entity) 

161 

162 for relevant_path, entities_in_path in relevant_paths.items(): 

163 stored_g = None 

164 # Here we try to obtain a reference to the currently stored graph 

165 output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path 

166 lock = FileLock(f"{output_filepath}.lock") 

167 with lock: 

168 if os.path.exists(output_filepath): 

169 stored_g = Reader(context_map=self.context_map).load(output_filepath) 

170 if stored_g is None: 

171 stored_g = Dataset() 

172 for entity_in_path in entities_in_path: 

173 self.store(entity_in_path, stored_g, relevant_path, context_path, False) 

174 self._store_in_file(stored_g, relevant_path, context_path) 

175 

176 return list(relevant_paths.keys()) 

177 

178 def store(self, entity: AbstractEntity, destination_g: Dataset, cur_file_path: str, context_path: str = None, store_now: bool = True) -> Dataset: 

179 self.repok.new_article() 

180 self.reperr.new_article() 

181 

182 try: 

183 if isinstance(entity, ProvEntity): 

184 quads: List[Tuple] = [] 

185 graph_identifier: URIRef = entity.g.identifier 

186 for triple in entity.g.triples((entity.res, None, None)): 

187 quads.append((*triple, graph_identifier)) 

188 destination_g.addN(quads) 

189 elif isinstance(entity, GraphEntity) or isinstance(entity, MetadataEntity): 

190 if entity.to_be_deleted: 

191 destination_g.remove((entity.res, None, None, None)) 

192 else: 

193 if len(entity.preexisting_graph) > 0: 

194 """ 

195 We're not in 'append mode', so we need to remove 

196 the entity that we're going to overwrite. 

197 """ 

198 destination_g.remove((entity.res, None, None, None)) 

199 """ 

200 Here we copy data from the entity into the stored graph. 

201 If the entity was marked as to be deleted, then we're 

202 done because we already removed all of its triples. 

203 """ 

204 quads: List[Tuple] = [] 

205 graph_identifier: URIRef = entity.g.identifier 

206 for triple in entity.g.triples((entity.res, None, None)): 

207 quads.append((*triple, graph_identifier)) 

208 destination_g.addN(quads) 

209 

210 if store_now: 

211 self._store_in_file(destination_g, cur_file_path, context_path) 

212 

213 return destination_g 

214 except Exception as e: 

215 self.reperr.add_sentence(f"[1] It was impossible to store the RDF statements in {cur_file_path}. {e}") 

216 

217 def upload_and_store(self, base_dir: str, triplestore_url: str, base_iri: str, context_path: str = None, 

218 batch_size: int = 10) -> None: 

219 stored_graph_path: List[str] = self.store_all(base_dir, base_iri, context_path) 

220 

221 # If some graphs were not stored properly, then no one will be uploaded to the triplestore 

222 # Anyway, we should highlight those ones that could have been added in principle, by 

223 # mentioning them with a ".notuploaded" marker 

224 if None in stored_graph_path: 

225 for file_path in stored_graph_path: 

226 if file_path is not None: 

227 # Create a marker for the file not uploaded in the triplestore 

228 open(f'{file_path}.notuploaded', 'wt', encoding='utf-8').close() 

229 self.reperr.add_sentence("[2] " 

230 f"The statements contained in the JSON-LD file '{file_path}' " 

231 "were not uploaded into the triplestore.") 

232 else: # All the files have been stored 

233 self.upload_all(triplestore_url, base_dir, batch_size) 

234 

235 def _dir_and_file_paths(self, res: URIRef, base_dir: str, base_iri: str, process_id: int|str = None) -> Tuple[str, str]: 

236 is_json: bool = (self.output_format == "json-ld") 

237 return find_paths(res, base_dir, base_iri, self.default_dir, self.dir_split, self.n_file_item, is_json=is_json, process_id=process_id) 

238 

239 @staticmethod 

240 def _class_to_entity_type(entity: AbstractEntity) -> Optional[str]: 

241 if isinstance(entity, GraphEntity): 

242 return "graph" 

243 elif isinstance(entity, ProvEntity): 

244 return "prov" 

245 elif isinstance(entity, MetadataEntity): 

246 return "metadata" 

247 else: 

248 return None 

249 

250 def upload_all(self, triplestore_url: str, base_dir: str = None, batch_size: int = 10, 

251 save_queries: bool = False, prepare_bulk_load: bool = False, 

252 bulk_load_dir: str = None) -> bool: 

253 """ 

254 Upload queries to triplestore or save them to disk. 

255 

256 Three usage modes: 

257 1. Default (save_queries=False, prepare_bulk_load=False): Execute combined SPARQL queries on triplestore 

258 2. Save queries (save_queries=True, prepare_bulk_load=False): Save combined SPARQL queries to disk 

259 3. Bulk load (save_queries=False, prepare_bulk_load=True): Prepare data for Virtuoso bulk loader (nquads + delete queries) 

260 

261 Args: 

262 triplestore_url: SPARQL endpoint URL 

263 base_dir: Base directory for output files 

264 batch_size: Number of queries per SPARQL batch 

265 save_queries: If True, save combined SPARQL queries to disk instead of uploading 

266 prepare_bulk_load: If True, prepare data for Virtuoso bulk loader (separate nquads + delete queries) 

267 bulk_load_dir: Directory for nquads files (required if prepare_bulk_load=True) 

268 

269 Returns: 

270 True if successful, False otherwise 

271 """ 

272 self.repok.new_article() 

273 self.reperr.new_article() 

274 

275 if save_queries and prepare_bulk_load: 

276 raise ValueError("save_queries and prepare_bulk_load are mutually exclusive") 

277 

278 if prepare_bulk_load and not bulk_load_dir: 

279 raise ValueError("bulk_load_dir is required when prepare_bulk_load=True") 

280 

281 if batch_size <= 0: 

282 batch_size = 10 

283 

284 query_batch: list = [] 

285 added_statements: int = 0 

286 removed_statements: int = 0 

287 result: bool = True 

288 

289 if base_dir: 

290 to_be_uploaded_dir = os.path.join(base_dir, "to_be_uploaded") 

291 os.makedirs(to_be_uploaded_dir, exist_ok=True) 

292 

293 if prepare_bulk_load: 

294 os.makedirs(bulk_load_dir, exist_ok=True) 

295 nquads_buffer: list = [] 

296 nquads_count: int = 0 

297 nquads_file_index: int = 0 

298 nquads_batch_size: int = 1000000 

299 

300 entities_to_process = self.a_set.res_to_entity.values() 

301 if self.modified_entities is not None: 

302 entities_to_process = [ 

303 entity for entity in entities_to_process 

304 if URIRef(str(entity.res).split('/prov/se/')[0]) in self.modified_entities 

305 ] 

306 

307 for entity in entities_to_process: 

308 entity_type = self._class_to_entity_type(entity) 

309 

310 if prepare_bulk_load: 

311 insert_queries, delete_queries, n_added, n_removed, insert_graph = get_separated_queries(entity, entity_type=entity_type) 

312 

313 if not insert_queries and not delete_queries: 

314 continue 

315 

316 if insert_queries: 

317 quads = serialize_graph_to_nquads(insert_graph, entity.g.identifier) 

318 nquads_buffer.extend(quads) 

319 nquads_count += len(quads) 

320 

321 if nquads_count >= nquads_batch_size: 

322 self._write_nquads_file(nquads_buffer, bulk_load_dir, nquads_file_index) 

323 nquads_file_index += 1 

324 nquads_buffer = [] 

325 nquads_count = 0 

326 

327 for delete_query in delete_queries: 

328 query_batch.append(delete_query) 

329 removed_statements += n_removed // len(delete_queries) 

330 

331 if len(query_batch) >= batch_size: 

332 query_string = " ; ".join(query_batch) 

333 self._save_query(query_string, to_be_uploaded_dir, 0, removed_statements) 

334 query_batch = [] 

335 removed_statements = 0 

336 else: 

337 update_queries, n_added, n_removed = get_update_query(entity, entity_type=entity_type) 

338 

339 if not update_queries: 

340 continue 

341 

342 for query in update_queries: 

343 query_batch.append(query) 

344 added_statements += n_added // len(update_queries) 

345 removed_statements += n_removed // len(update_queries) 

346 

347 if len(query_batch) >= batch_size: 

348 query_string = " ; ".join(query_batch) 

349 if save_queries: 

350 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements) 

351 else: 

352 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

353 query_batch = [] 

354 added_statements = 0 

355 removed_statements = 0 

356 

357 if query_batch: 

358 query_string = " ; ".join(query_batch) 

359 if prepare_bulk_load: 

360 self._save_query(query_string, to_be_uploaded_dir, 0, removed_statements) 

361 elif save_queries: 

362 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements) 

363 else: 

364 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

365 

366 if prepare_bulk_load and nquads_buffer: 

367 self._write_nquads_file(nquads_buffer, bulk_load_dir, nquads_file_index) 

368 

369 return result 

370 

371 def _write_nquads_file(self, nquads: list, output_dir: str, file_index: int) -> None: 

372 """ 

373 Writes N-Quads to a gzipped file. 

374 

375 Args: 

376 nquads: List of N-Quad strings 

377 output_dir: Output directory 

378 file_index: File index for naming 

379 """ 

380 filename = f"bulk_load_{file_index:05d}.nq.gz" 

381 filepath = os.path.join(output_dir, filename) 

382 

383 with gzip.open(filepath, 'wt', encoding='utf-8') as f: 

384 f.writelines(nquads) 

385 

386 self.repok.add_sentence(f"Written {len(nquads)} quads to {filename}") 

387 

388 def _save_query(self, query_string: str, directory: str, added_statements: int, removed_statements: int) -> None: 

389 content_hash = hashlib.sha256(query_string.encode('utf-8')).hexdigest()[:16] 

390 file_name = f"{content_hash}_add{added_statements}_remove{removed_statements}.sparql" 

391 file_path = os.path.join(directory, file_name) 

392 with open(file_path, 'w', encoding='utf-8') as f: 

393 f.write(query_string) 

394 

395 def upload(self, entity: AbstractEntity, triplestore_url: str, base_dir: str = None) -> bool: 

396 self.repok.new_article() 

397 self.reperr.new_article() 

398 

399 update_query, n_added, n_removed = get_update_query(entity, entity_type=self._class_to_entity_type(entity)) 

400 

401 return self._query(update_query, triplestore_url, base_dir, n_added, n_removed) 

402 

403 def execute_query(self, query_string: str, triplestore_url: str) -> bool: 

404 self.repok.new_article() 

405 self.reperr.new_article() 

406 

407 return self._query(query_string, triplestore_url) 

408 

409 def _query(self, query_string: str, triplestore_url: str, base_dir: str = None, 

410 added_statements: int = 0, removed_statements: int = 0) -> bool: 

411 if query_string != "": 

412 try: 

413 with SPARQLClient(triplestore_url, max_retries=3, backoff_factor=2.5) as client: 

414 client.update(query_string) 

415 

416 self.repok.add_sentence( 

417 f"Triplestore updated with {added_statements} added statements and " 

418 f"with {removed_statements} removed statements.") 

419 

420 return True 

421 

422 except EndpointError as e: 

423 self.reperr.add_sentence("[3] " 

424 "Graph was not loaded into the " 

425 f"triplestore due to communication problems: {e}") 

426 if base_dir is not None: 

427 tp_err_dir: str = base_dir + os.sep + "tp_err" 

428 if not os.path.exists(tp_err_dir): 

429 os.makedirs(tp_err_dir, exist_ok=True) 

430 cur_file_err: str = tp_err_dir + os.sep + \ 

431 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt') 

432 with open(cur_file_err, 'wt', encoding='utf-8') as f: 

433 f.write(query_string) 

434 

435 return False