Coverage for oc_ocdm / storer.py: 83%

362 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-08 20:23 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com> 

4# SPDX-FileCopyrightText: 2022-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8# -*- coding: utf-8 -*- 

9from __future__ import annotations 

10 

11import hashlib 

12import json 

13import os 

14from datetime import datetime 

15from typing import TYPE_CHECKING 

16from zipfile import ZIP_DEFLATED, ZipFile 

17 

18import orjson 

19from filelock import FileLock 

20from rdflib import Dataset, Literal, URIRef 

21from triplelite import TripleLite 

22 

23from oc_ocdm.constants import RDF_TYPE, XSD_STRING 

24from oc_ocdm.graph.graph_entity import GraphEntity 

25from oc_ocdm.metadata.metadata_entity import MetadataEntity 

26from oc_ocdm.prov.prov_entity import ProvEntity 

27from oc_ocdm.reader import Reader, _transform_jsonld_graphs 

28from oc_ocdm.support.query_utils import get_update_query 

29from oc_ocdm.support.reporter import Reporter 

30from oc_ocdm.support.sparql import SPARQLEndpointError, sparql_update 

31from oc_ocdm.support.support import find_paths 

32 

33if TYPE_CHECKING: 

34 from typing import Any, Dict, List, Set, Tuple 

35 

36 from oc_ocdm.abstract_entity import AbstractEntity 

37 from oc_ocdm.abstract_set import AbstractSet 

38 

39 

40def _entity_to_jsonld_dict(entity) -> dict: 

41 result: dict = {"@id": entity.res} 

42 types: list[str] = [] 

43 props: dict[str, list] = {} 

44 for _, p, o in entity.g.triples((entity.res, None, None)): 

45 if p == RDF_TYPE: 

46 types.append(o.value) 

47 else: 

48 if o.type == "uri": 

49 val: dict = {"@id": o.value} 

50 elif o.lang: 

51 val = {"@language": o.lang, "@value": o.value} 

52 else: 

53 val = {"@type": o.datatype if o.datatype else XSD_STRING, "@value": o.value} 

54 props.setdefault(p, []).append(val) 

55 if types: 

56 result["@type"] = types 

57 result.update(props) 

58 return result 

59 

60 

61def _compact_uri(uri: str, ns_to_prefix: list[tuple[str, str]]) -> str: 

62 for ns, prefix in ns_to_prefix: 

63 if uri.startswith(ns): 

64 return prefix + ":" + uri[len(ns):] 

65 return uri 

66 

67 

68def _compact_jsonld(data: list[dict], context_path: str, ns_to_prefix: list[tuple[str, str]]) -> dict | list[dict]: 

69 compacted = _transform_jsonld_graphs(data, lambda uri: _compact_uri(uri, ns_to_prefix)) 

70 for graph_obj in compacted: 

71 graph_obj["@context"] = context_path 

72 if len(compacted) == 1: 

73 return compacted[0] 

74 return compacted 

75 

76 

77class _JsonLdDoc: 

78 __slots__ = ("_entities",) 

79 

80 def __init__(self, data: list[dict]) -> None: 

81 self._entities: dict[str, dict[str, dict]] = {} 

82 for graph_obj in data: 

83 graph_iri = graph_obj["@id"] 

84 entity_index: dict[str, dict] = {} 

85 for entity_dict in graph_obj["@graph"]: 

86 entity_index[entity_dict["@id"]] = entity_dict 

87 self._entities[graph_iri] = entity_index 

88 

89 def upsert_entity(self, graph_iri: str, entity_uri: str, entity_dict: dict) -> None: 

90 if graph_iri not in self._entities: 

91 self._entities[graph_iri] = {} 

92 self._entities[graph_iri][entity_uri] = entity_dict 

93 

94 def merge_entity(self, graph_iri: str, entity_uri: str, entity_dict: dict) -> None: 

95 if graph_iri not in self._entities: 

96 self._entities[graph_iri] = {} 

97 existing = self._entities[graph_iri].get(entity_uri) 

98 if existing is None: 

99 self._entities[graph_iri][entity_uri] = entity_dict 

100 return 

101 for key, value in entity_dict.items(): 

102 if key == "@id": 

103 continue 

104 if key not in existing: 

105 existing[key] = value 

106 else: 

107 for v in value: 

108 if v not in existing[key]: 

109 existing[key].append(v) 

110 

111 def remove_entity(self, graph_iri: str, entity_uri: str) -> None: 

112 if graph_iri in self._entities and entity_uri in self._entities[graph_iri]: 

113 del self._entities[graph_iri][entity_uri] 

114 

115 def to_list(self) -> list[dict]: 

116 return [ 

117 {"@id": graph_iri, "@graph": list(entities.values())} 

118 for graph_iri, entities in self._entities.items() 

119 if entities 

120 ] 

121 

122 

123class Storer(object): 

124 

125 def __init__(self, abstract_set: AbstractSet, repok: Reporter | None = None, reperr: Reporter | None = None, 

126 context_map: Dict[str, Any] | None = None, default_dir: str = "_", dir_split: int = 0, 

127 n_file_item: int = 1, output_format: str = "json-ld", zip_output: bool = False, modified_entities: set | None = None) -> None: 

128 # We only accept format strings that: 

129 # 1. are supported by rdflib 

130 # 2. correspond to an output format which is effectively either NT or NQ 

131 # The only exception to this rule is the 'json-ld' format, which is the default value of 'output_format'. 

132 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11', 

133 'application/n-quads', 'nquads', 'json-ld'} 

134 if output_format not in supported_formats: 

135 raise ValueError(f"Given output_format '{output_format}' is not supported." 

136 f" Available formats: {supported_formats}.") 

137 else: 

138 self.output_format: str = output_format 

139 self.zip_output = zip_output 

140 self.dir_split: int = dir_split 

141 self.n_file_item: int = n_file_item 

142 self.default_dir: str = default_dir if default_dir != "" else "_" 

143 self.a_set: AbstractSet = abstract_set 

144 self.modified_entities = modified_entities 

145 

146 if context_map is not None: 

147 self.context_map: Dict[str, Any] = context_map 

148 else: 

149 self.context_map: Dict[str, Any] = {} 

150 

151 if self.output_format == "json-ld": 

152 for context_url in self.context_map: 

153 ctx_file_path: Any = self.context_map[context_url] 

154 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path): 

155 # This expensive operation is done only when it's really needed 

156 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f: 

157 self.context_map[context_url] = json.load(ctx_f) 

158 

159 if repok is None: 

160 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ") 

161 else: 

162 self.repok: Reporter = repok 

163 

164 if reperr is None: 

165 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ") 

166 else: 

167 self.reperr: Reporter = reperr 

168 

169 @staticmethod 

170 def _to_rdflib_obj(o) -> URIRef | Literal: 

171 if o.type == "literal": 

172 if o.lang: 

173 return Literal(o.value, lang=o.lang) 

174 return Literal(o.value, datatype=URIRef(o.datatype)) 

175 return URIRef(o.value) 

176 

177 @staticmethod 

178 def _entity_quads(entity_g) -> list: 

179 if isinstance(entity_g, TripleLite): 

180 graph_id = URIRef(entity_g.identifier) if entity_g.identifier else None 

181 return [(URIRef(s), URIRef(p), Storer._to_rdflib_obj(o), graph_id) 

182 for s, p, o in entity_g] 

183 graph_id = entity_g.identifier 

184 return [(*item, graph_id) for item in entity_g] 

185 

186 def store_graphs_in_file(self, file_path: str, context_path: str | None = None) -> None: 

187 self.repok.new_article() 

188 self.reperr.new_article() 

189 self.repok.add_sentence("Store the graphs into a file: starting process") 

190 

191 if self.output_format == "json-ld": 

192 self._store_graphs_in_file_jsonld_fast(file_path, context_path) 

193 return 

194 

195 cg: Dataset = Dataset() 

196 for g in self.a_set.graphs(): 

197 cg.addN(self._entity_quads(g)) 

198 

199 self._store_in_file(cg, file_path, context_path) 

200 

201 def _store_in_file(self, cur_g: Dataset, cur_file_path: str, context_path: str | None = None) -> None: 

202 zip_file_path = cur_file_path.replace(os.path.splitext(cur_file_path)[1], ".zip") 

203 

204 if self.zip_output: 

205 with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file: 

206 self._write_graph(cur_g, zip_file, cur_file_path, context_path) 

207 else: 

208 self._write_graph(cur_g, None, cur_file_path, context_path) 

209 

210 self.repok.add_sentence(f"File '{cur_file_path}' added.") 

211 

212 def _write_graph(self, graph: Dataset, zip_file: ZipFile | None, cur_file_path: str, context_path: str | None) -> None: 

213 if self.output_format == "json-ld": 

214 if context_path is not None and context_path in self.context_map: 

215 cur_json_ld = json.loads(graph.serialize(format="json-ld", context=self.context_map[context_path])) 

216 if isinstance(cur_json_ld, dict): 

217 cur_json_ld["@context"] = context_path 

218 else: 

219 for item in cur_json_ld: 

220 item["@context"] = context_path 

221 if zip_file is not None: 

222 data = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8') 

223 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data) 

224 else: 

225 with open(cur_file_path, 'wt', encoding='utf-8') as f: 

226 json.dump(cur_json_ld, f, ensure_ascii=False) 

227 else: 

228 if zip_file is not None: 

229 data = graph.serialize(format="json-ld").encode('utf-8') 

230 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data) 

231 else: 

232 graph.serialize(destination=cur_file_path, format="json-ld") 

233 else: 

234 # Handle other RDF formats 

235 if zip_file is not None: 

236 rdf_serialization = graph.serialize(destination=None, format=self.output_format, encoding="utf-8") 

237 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization) 

238 else: 

239 graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8") 

240 

241 def store_all(self, base_dir: str, base_iri: str, context_path: str | None = None, process_id: int | str | None = None) -> List[str]: 

242 self.repok.new_article() 

243 self.reperr.new_article() 

244 

245 self.repok.add_sentence("Starting the process") 

246 

247 relevant_paths: Dict[str, list] = dict() 

248 created_dirs = set() 

249 for entity in self.a_set.res_to_entity.values(): 

250 is_relevant = True 

251 if self.modified_entities is not None and entity.res.split('/prov/se/')[0] not in self.modified_entities: 

252 is_relevant = False 

253 if is_relevant: 

254 cur_dir_path, cur_file_path = self._dir_and_file_paths(entity.res, base_dir, base_iri, process_id) 

255 if cur_dir_path not in created_dirs: 

256 os.makedirs(cur_dir_path, exist_ok=True) 

257 created_dirs.add(cur_dir_path) 

258 relevant_paths.setdefault(cur_file_path, list()) 

259 relevant_paths[cur_file_path].append(entity) 

260 

261 if self.output_format == "json-ld": 

262 return self._store_all_jsonld_fast(relevant_paths, context_path) 

263 

264 reader = Reader(context_map=self.context_map) 

265 for relevant_path, entities_in_path in relevant_paths.items(): 

266 stored_g = None 

267 output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path 

268 lock = FileLock(f"{output_filepath}.lock") 

269 with lock: 

270 if os.path.exists(output_filepath): 

271 stored_g = reader.load(output_filepath) 

272 if stored_g is None: 

273 stored_g = Dataset() 

274 for entity_in_path in entities_in_path: 

275 self.store(entity_in_path, stored_g, relevant_path, context_path, False) 

276 self._store_in_file(stored_g, relevant_path, context_path) 

277 

278 return list(relevant_paths.keys()) 

279 

280 def _entity_triples_as_rdflib_quads(self, entity: AbstractEntity) -> List[Tuple]: 

281 graph_id = URIRef(entity.g.identifier) if entity.g.identifier else None 

282 return [(URIRef(s), URIRef(p), self._to_rdflib_obj(o), graph_id) 

283 for s, p, o in entity.g.triples((entity.res, None, None))] 

284 

285 def store(self, entity: AbstractEntity, destination_g: Dataset, cur_file_path: str, context_path: str | None = None, store_now: bool = True) -> Dataset | None: 

286 self.repok.new_article() 

287 self.reperr.new_article() 

288 

289 try: 

290 if isinstance(entity, ProvEntity): 

291 destination_g.addN(self._entity_triples_as_rdflib_quads(entity)) 

292 elif isinstance(entity, GraphEntity) or isinstance(entity, MetadataEntity): 

293 if entity.to_be_deleted: 

294 destination_g.remove((URIRef(entity.res), None, None, None)) # type: ignore[arg-type] 

295 else: 

296 if len(entity._preexisting_triples) > 0: 

297 destination_g.remove((URIRef(entity.res), None, None, None)) # type: ignore[arg-type] 

298 destination_g.addN(self._entity_triples_as_rdflib_quads(entity)) 

299 

300 if store_now: 

301 self._store_in_file(destination_g, cur_file_path, context_path) 

302 

303 return destination_g 

304 except Exception as e: 

305 self.reperr.add_sentence(f"[1] It was impossible to store the RDF statements in {cur_file_path}. {e}") 

306 

307 def _build_ns_to_prefix(self, context_path: str) -> list[tuple[str, str]]: 

308 ctx = self.context_map[context_path] 

309 if isinstance(ctx, dict) and "@context" in ctx: 

310 ctx = ctx["@context"] 

311 pairs = [ 

312 (ns, prefix) for prefix, ns in ctx.items() 

313 if isinstance(ns, str) and not prefix.startswith("@") 

314 ] 

315 pairs.sort(key=lambda x: len(x[0]), reverse=True) 

316 return pairs 

317 

318 def _write_jsonld_fast(self, json_bytes: bytes, relevant_path: str) -> None: 

319 if self.zip_output: 

320 zip_file_path = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") 

321 with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zf: 

322 zf.writestr(os.path.basename(relevant_path), json_bytes) 

323 else: 

324 with open(relevant_path, 'wb') as f: 

325 f.write(json_bytes) 

326 self.repok.add_sentence(f"File '{relevant_path}' added.") 

327 

328 def _store_all_jsonld_fast(self, relevant_paths: Dict[str, list], context_path: str | None) -> List[str]: 

329 reader = Reader(context_map=self.context_map) 

330 ns_to_prefix: list[tuple[str, str]] | None = None 

331 if context_path is not None and context_path in self.context_map: 

332 ns_to_prefix = self._build_ns_to_prefix(context_path) 

333 

334 for relevant_path, entities_in_path in relevant_paths.items(): 

335 output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path 

336 lock = FileLock(f"{output_filepath}.lock") 

337 with lock: 

338 existing_data: list[dict] | None = None 

339 if os.path.exists(output_filepath): 

340 existing_data = reader.load_jsonld_dict(output_filepath) 

341 doc = _JsonLdDoc(existing_data if existing_data is not None else []) 

342 

343 for entity in entities_in_path: 

344 graph_iri = entity.g.identifier 

345 if isinstance(entity, ProvEntity): 

346 doc.merge_entity(graph_iri, entity.res, _entity_to_jsonld_dict(entity)) 

347 elif isinstance(entity, (GraphEntity, MetadataEntity)): 

348 if entity.to_be_deleted: 

349 doc.remove_entity(graph_iri, entity.res) 

350 else: 

351 if len(entity._preexisting_triples) > 0: 

352 doc.remove_entity(graph_iri, entity.res) 

353 doc.upsert_entity(graph_iri, entity.res, _entity_to_jsonld_dict(entity)) 

354 

355 output_data: list[dict] | dict = doc.to_list() 

356 if context_path is not None and ns_to_prefix is not None: 

357 output_data = _compact_jsonld(output_data, context_path, ns_to_prefix) 

358 json_bytes = orjson.dumps(output_data) 

359 self._write_jsonld_fast(json_bytes, relevant_path) 

360 

361 return list(relevant_paths.keys()) 

362 

363 def _store_graphs_in_file_jsonld_fast(self, file_path: str, context_path: str | None) -> None: 

364 doc = _JsonLdDoc([]) 

365 for entity in self.a_set.res_to_entity.values(): 

366 if len(entity.g) > 0: 

367 graph_iri = entity.g.identifier 

368 doc.upsert_entity(graph_iri, entity.res, _entity_to_jsonld_dict(entity)) 

369 

370 output_data: list[dict] | dict = doc.to_list() 

371 if context_path is not None and context_path in self.context_map: 

372 ns_to_prefix = self._build_ns_to_prefix(context_path) 

373 output_data = _compact_jsonld(output_data, context_path, ns_to_prefix) 

374 json_bytes = orjson.dumps(output_data) 

375 self._write_jsonld_fast(json_bytes, file_path) 

376 

377 def upload_and_store(self, base_dir: str, triplestore_url: str, base_iri: str, context_path: str | None = None, 

378 batch_size: int = 10) -> None: 

379 stored_graph_path: List[str] = self.store_all(base_dir, base_iri, context_path) 

380 

381 # If some graphs were not stored properly, then no one will be uploaded to the triplestore 

382 # Anyway, we should highlight those ones that could have been added in principle, by 

383 # mentioning them with a ".notuploaded" marker 

384 if None in stored_graph_path: 

385 for file_path in stored_graph_path: 

386 if file_path is not None: 

387 # Create a marker for the file not uploaded in the triplestore 

388 open(f'{file_path}.notuploaded', 'wt', encoding='utf-8').close() 

389 self.reperr.add_sentence("[2] " 

390 f"The statements contained in the JSON-LD file '{file_path}' " 

391 "were not uploaded into the triplestore.") 

392 else: # All the files have been stored 

393 self.upload_all(triplestore_url, base_dir, batch_size) 

394 

395 def _dir_and_file_paths(self, res: URIRef, base_dir: str, base_iri: str, process_id: int | str | None = None) -> Tuple[str, str]: 

396 is_json: bool = (self.output_format == "json-ld") 

397 return find_paths(res, base_dir, base_iri, self.default_dir, self.dir_split, self.n_file_item, is_json=is_json, process_id=process_id) 

398 

399 @staticmethod 

400 def _class_to_entity_type(entity: AbstractEntity) -> str: 

401 if isinstance(entity, GraphEntity): 

402 return "graph" 

403 elif isinstance(entity, ProvEntity): 

404 return "prov" 

405 else: 

406 return "metadata" 

407 

408 def upload_all(self, triplestore_url: str, base_dir: str | None = None, batch_size: int = 10, 

409 save_queries: bool = False) -> bool: 

410 """ 

411 Upload SPARQL update queries to the triplestore in batches, or save them to disk. 

412 

413 Args: 

414 triplestore_url: SPARQL endpoint URL 

415 base_dir: Base directory for output files (required when save_queries is True) 

416 batch_size: Number of queries per SPARQL batch 

417 save_queries: If True, save combined SPARQL queries to disk instead of uploading 

418 

419 Returns: 

420 True if all batches were processed successfully, False otherwise 

421 """ 

422 self.repok.new_article() 

423 self.reperr.new_article() 

424 

425 if batch_size <= 0: 

426 batch_size = 10 

427 

428 query_batch: list = [] 

429 added_statements: int = 0 

430 removed_statements: int = 0 

431 result: bool = True 

432 to_be_uploaded_dir: str = "" 

433 

434 if base_dir: 

435 to_be_uploaded_dir = os.path.join(base_dir, "to_be_uploaded") 

436 os.makedirs(to_be_uploaded_dir, exist_ok=True) 

437 

438 entities_to_process = self.a_set.res_to_entity.values() 

439 if self.modified_entities is not None: 

440 entities_to_process = [ 

441 entity for entity in entities_to_process 

442 if str(entity.res).split('/prov/se/')[0] in self.modified_entities 

443 ] 

444 

445 for entity in entities_to_process: 

446 entity_type = self._class_to_entity_type(entity) 

447 update_queries, n_added, n_removed = get_update_query(entity, entity_type=entity_type) 

448 

449 if not update_queries: 

450 continue 

451 

452 for query in update_queries: 

453 query_batch.append(query) 

454 added_statements += n_added // len(update_queries) 

455 removed_statements += n_removed // len(update_queries) 

456 

457 if len(query_batch) >= batch_size: 

458 query_string = " ; ".join(query_batch) 

459 if save_queries: 

460 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements) 

461 else: 

462 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

463 query_batch = [] 

464 added_statements = 0 

465 removed_statements = 0 

466 

467 if query_batch: 

468 query_string = " ; ".join(query_batch) 

469 if save_queries: 

470 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements) 

471 else: 

472 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

473 

474 return result 

475 

476 def _save_query(self, query_string: str, directory: str, added_statements: int, removed_statements: int) -> None: 

477 content_hash = hashlib.sha256(query_string.encode('utf-8')).hexdigest()[:16] 

478 file_name = f"{content_hash}_add{added_statements}_remove{removed_statements}.sparql" 

479 file_path = os.path.join(directory, file_name) 

480 with open(file_path, 'w', encoding='utf-8') as f: 

481 f.write(query_string) 

482 

483 def upload(self, entity: AbstractEntity, triplestore_url: str, base_dir: str | None = None) -> bool: 

484 self.repok.new_article() 

485 self.reperr.new_article() 

486 

487 entity_type = self._class_to_entity_type(entity) 

488 update_queries, n_added, n_removed = get_update_query(entity, entity_type=entity_type) 

489 query_string = " ; ".join(update_queries) if update_queries else "" 

490 return self._query(query_string, triplestore_url, base_dir, n_added, n_removed) 

491 

492 def execute_query(self, query_string: str, triplestore_url: str) -> bool: 

493 self.repok.new_article() 

494 self.reperr.new_article() 

495 

496 return self._query(query_string, triplestore_url) 

497 

498 def _query(self, query_string: str, triplestore_url: str, base_dir: str | None = None, 

499 added_statements: int = 0, removed_statements: int = 0) -> bool: 

500 if query_string != "": 

501 try: 

502 sparql_update(triplestore_url, query_string, max_retries=3, backoff_factor=2.5) 

503 

504 self.repok.add_sentence( 

505 f"Triplestore updated with {added_statements} added statements and " 

506 f"with {removed_statements} removed statements.") 

507 

508 return True 

509 

510 except SPARQLEndpointError as e: 

511 self.reperr.add_sentence("[3] " 

512 "Graph was not loaded into the " 

513 f"triplestore due to communication problems: {e}") 

514 if base_dir is not None: 

515 tp_err_dir: str = base_dir + os.sep + "tp_err" 

516 if not os.path.exists(tp_err_dir): 

517 os.makedirs(tp_err_dir, exist_ok=True) 

518 cur_file_err: str = tp_err_dir + os.sep + \ 

519 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt') 

520 with open(cur_file_err, 'wt', encoding='utf-8') as f: 

521 f.write(query_string) 

522 

523 return False