Coverage for oc_ocdm / storer.py: 83%
362 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2022-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8# -*- coding: utf-8 -*-
9from __future__ import annotations
11import hashlib
12import json
13import os
14from datetime import datetime
15from typing import TYPE_CHECKING
16from zipfile import ZIP_DEFLATED, ZipFile
18import orjson
19from filelock import FileLock
20from rdflib import Dataset, Literal, URIRef
21from triplelite import TripleLite
23from oc_ocdm.constants import RDF_TYPE, XSD_STRING
24from oc_ocdm.graph.graph_entity import GraphEntity
25from oc_ocdm.metadata.metadata_entity import MetadataEntity
26from oc_ocdm.prov.prov_entity import ProvEntity
27from oc_ocdm.reader import Reader, _transform_jsonld_graphs
28from oc_ocdm.support.query_utils import get_update_query
29from oc_ocdm.support.reporter import Reporter
30from oc_ocdm.support.sparql import SPARQLEndpointError, sparql_update
31from oc_ocdm.support.support import find_paths
33if TYPE_CHECKING:
34 from typing import Any, Dict, List, Set, Tuple
36 from oc_ocdm.abstract_entity import AbstractEntity
37 from oc_ocdm.abstract_set import AbstractSet
40def _entity_to_jsonld_dict(entity) -> dict:
41 result: dict = {"@id": entity.res}
42 types: list[str] = []
43 props: dict[str, list] = {}
44 for _, p, o in entity.g.triples((entity.res, None, None)):
45 if p == RDF_TYPE:
46 types.append(o.value)
47 else:
48 if o.type == "uri":
49 val: dict = {"@id": o.value}
50 elif o.lang:
51 val = {"@language": o.lang, "@value": o.value}
52 else:
53 val = {"@type": o.datatype if o.datatype else XSD_STRING, "@value": o.value}
54 props.setdefault(p, []).append(val)
55 if types:
56 result["@type"] = types
57 result.update(props)
58 return result
61def _compact_uri(uri: str, ns_to_prefix: list[tuple[str, str]]) -> str:
62 for ns, prefix in ns_to_prefix:
63 if uri.startswith(ns):
64 return prefix + ":" + uri[len(ns):]
65 return uri
68def _compact_jsonld(data: list[dict], context_path: str, ns_to_prefix: list[tuple[str, str]]) -> dict | list[dict]:
69 compacted = _transform_jsonld_graphs(data, lambda uri: _compact_uri(uri, ns_to_prefix))
70 for graph_obj in compacted:
71 graph_obj["@context"] = context_path
72 if len(compacted) == 1:
73 return compacted[0]
74 return compacted
77class _JsonLdDoc:
78 __slots__ = ("_entities",)
80 def __init__(self, data: list[dict]) -> None:
81 self._entities: dict[str, dict[str, dict]] = {}
82 for graph_obj in data:
83 graph_iri = graph_obj["@id"]
84 entity_index: dict[str, dict] = {}
85 for entity_dict in graph_obj["@graph"]:
86 entity_index[entity_dict["@id"]] = entity_dict
87 self._entities[graph_iri] = entity_index
89 def upsert_entity(self, graph_iri: str, entity_uri: str, entity_dict: dict) -> None:
90 if graph_iri not in self._entities:
91 self._entities[graph_iri] = {}
92 self._entities[graph_iri][entity_uri] = entity_dict
94 def merge_entity(self, graph_iri: str, entity_uri: str, entity_dict: dict) -> None:
95 if graph_iri not in self._entities:
96 self._entities[graph_iri] = {}
97 existing = self._entities[graph_iri].get(entity_uri)
98 if existing is None:
99 self._entities[graph_iri][entity_uri] = entity_dict
100 return
101 for key, value in entity_dict.items():
102 if key == "@id":
103 continue
104 if key not in existing:
105 existing[key] = value
106 else:
107 for v in value:
108 if v not in existing[key]:
109 existing[key].append(v)
111 def remove_entity(self, graph_iri: str, entity_uri: str) -> None:
112 if graph_iri in self._entities and entity_uri in self._entities[graph_iri]:
113 del self._entities[graph_iri][entity_uri]
115 def to_list(self) -> list[dict]:
116 return [
117 {"@id": graph_iri, "@graph": list(entities.values())}
118 for graph_iri, entities in self._entities.items()
119 if entities
120 ]
123class Storer(object):
125 def __init__(self, abstract_set: AbstractSet, repok: Reporter | None = None, reperr: Reporter | None = None,
126 context_map: Dict[str, Any] | None = None, default_dir: str = "_", dir_split: int = 0,
127 n_file_item: int = 1, output_format: str = "json-ld", zip_output: bool = False, modified_entities: set | None = None) -> None:
128 # We only accept format strings that:
129 # 1. are supported by rdflib
130 # 2. correspond to an output format which is effectively either NT or NQ
131 # The only exception to this rule is the 'json-ld' format, which is the default value of 'output_format'.
132 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11',
133 'application/n-quads', 'nquads', 'json-ld'}
134 if output_format not in supported_formats:
135 raise ValueError(f"Given output_format '{output_format}' is not supported."
136 f" Available formats: {supported_formats}.")
137 else:
138 self.output_format: str = output_format
139 self.zip_output = zip_output
140 self.dir_split: int = dir_split
141 self.n_file_item: int = n_file_item
142 self.default_dir: str = default_dir if default_dir != "" else "_"
143 self.a_set: AbstractSet = abstract_set
144 self.modified_entities = modified_entities
146 if context_map is not None:
147 self.context_map: Dict[str, Any] = context_map
148 else:
149 self.context_map: Dict[str, Any] = {}
151 if self.output_format == "json-ld":
152 for context_url in self.context_map:
153 ctx_file_path: Any = self.context_map[context_url]
154 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path):
155 # This expensive operation is done only when it's really needed
156 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f:
157 self.context_map[context_url] = json.load(ctx_f)
159 if repok is None:
160 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ")
161 else:
162 self.repok: Reporter = repok
164 if reperr is None:
165 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ")
166 else:
167 self.reperr: Reporter = reperr
169 @staticmethod
170 def _to_rdflib_obj(o) -> URIRef | Literal:
171 if o.type == "literal":
172 if o.lang:
173 return Literal(o.value, lang=o.lang)
174 return Literal(o.value, datatype=URIRef(o.datatype))
175 return URIRef(o.value)
177 @staticmethod
178 def _entity_quads(entity_g) -> list:
179 if isinstance(entity_g, TripleLite):
180 graph_id = URIRef(entity_g.identifier) if entity_g.identifier else None
181 return [(URIRef(s), URIRef(p), Storer._to_rdflib_obj(o), graph_id)
182 for s, p, o in entity_g]
183 graph_id = entity_g.identifier
184 return [(*item, graph_id) for item in entity_g]
186 def store_graphs_in_file(self, file_path: str, context_path: str | None = None) -> None:
187 self.repok.new_article()
188 self.reperr.new_article()
189 self.repok.add_sentence("Store the graphs into a file: starting process")
191 if self.output_format == "json-ld":
192 self._store_graphs_in_file_jsonld_fast(file_path, context_path)
193 return
195 cg: Dataset = Dataset()
196 for g in self.a_set.graphs():
197 cg.addN(self._entity_quads(g))
199 self._store_in_file(cg, file_path, context_path)
201 def _store_in_file(self, cur_g: Dataset, cur_file_path: str, context_path: str | None = None) -> None:
202 zip_file_path = cur_file_path.replace(os.path.splitext(cur_file_path)[1], ".zip")
204 if self.zip_output:
205 with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file:
206 self._write_graph(cur_g, zip_file, cur_file_path, context_path)
207 else:
208 self._write_graph(cur_g, None, cur_file_path, context_path)
210 self.repok.add_sentence(f"File '{cur_file_path}' added.")
212 def _write_graph(self, graph: Dataset, zip_file: ZipFile | None, cur_file_path: str, context_path: str | None) -> None:
213 if self.output_format == "json-ld":
214 if context_path is not None and context_path in self.context_map:
215 cur_json_ld = json.loads(graph.serialize(format="json-ld", context=self.context_map[context_path]))
216 if isinstance(cur_json_ld, dict):
217 cur_json_ld["@context"] = context_path
218 else:
219 for item in cur_json_ld:
220 item["@context"] = context_path
221 if zip_file is not None:
222 data = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8')
223 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data)
224 else:
225 with open(cur_file_path, 'wt', encoding='utf-8') as f:
226 json.dump(cur_json_ld, f, ensure_ascii=False)
227 else:
228 if zip_file is not None:
229 data = graph.serialize(format="json-ld").encode('utf-8')
230 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data)
231 else:
232 graph.serialize(destination=cur_file_path, format="json-ld")
233 else:
234 # Handle other RDF formats
235 if zip_file is not None:
236 rdf_serialization = graph.serialize(destination=None, format=self.output_format, encoding="utf-8")
237 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization)
238 else:
239 graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8")
241 def store_all(self, base_dir: str, base_iri: str, context_path: str | None = None, process_id: int | str | None = None) -> List[str]:
242 self.repok.new_article()
243 self.reperr.new_article()
245 self.repok.add_sentence("Starting the process")
247 relevant_paths: Dict[str, list] = dict()
248 created_dirs = set()
249 for entity in self.a_set.res_to_entity.values():
250 is_relevant = True
251 if self.modified_entities is not None and entity.res.split('/prov/se/')[0] not in self.modified_entities:
252 is_relevant = False
253 if is_relevant:
254 cur_dir_path, cur_file_path = self._dir_and_file_paths(entity.res, base_dir, base_iri, process_id)
255 if cur_dir_path not in created_dirs:
256 os.makedirs(cur_dir_path, exist_ok=True)
257 created_dirs.add(cur_dir_path)
258 relevant_paths.setdefault(cur_file_path, list())
259 relevant_paths[cur_file_path].append(entity)
261 if self.output_format == "json-ld":
262 return self._store_all_jsonld_fast(relevant_paths, context_path)
264 reader = Reader(context_map=self.context_map)
265 for relevant_path, entities_in_path in relevant_paths.items():
266 stored_g = None
267 output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path
268 lock = FileLock(f"{output_filepath}.lock")
269 with lock:
270 if os.path.exists(output_filepath):
271 stored_g = reader.load(output_filepath)
272 if stored_g is None:
273 stored_g = Dataset()
274 for entity_in_path in entities_in_path:
275 self.store(entity_in_path, stored_g, relevant_path, context_path, False)
276 self._store_in_file(stored_g, relevant_path, context_path)
278 return list(relevant_paths.keys())
280 def _entity_triples_as_rdflib_quads(self, entity: AbstractEntity) -> List[Tuple]:
281 graph_id = URIRef(entity.g.identifier) if entity.g.identifier else None
282 return [(URIRef(s), URIRef(p), self._to_rdflib_obj(o), graph_id)
283 for s, p, o in entity.g.triples((entity.res, None, None))]
285 def store(self, entity: AbstractEntity, destination_g: Dataset, cur_file_path: str, context_path: str | None = None, store_now: bool = True) -> Dataset | None:
286 self.repok.new_article()
287 self.reperr.new_article()
289 try:
290 if isinstance(entity, ProvEntity):
291 destination_g.addN(self._entity_triples_as_rdflib_quads(entity))
292 elif isinstance(entity, GraphEntity) or isinstance(entity, MetadataEntity):
293 if entity.to_be_deleted:
294 destination_g.remove((URIRef(entity.res), None, None, None)) # type: ignore[arg-type]
295 else:
296 if len(entity._preexisting_triples) > 0:
297 destination_g.remove((URIRef(entity.res), None, None, None)) # type: ignore[arg-type]
298 destination_g.addN(self._entity_triples_as_rdflib_quads(entity))
300 if store_now:
301 self._store_in_file(destination_g, cur_file_path, context_path)
303 return destination_g
304 except Exception as e:
305 self.reperr.add_sentence(f"[1] It was impossible to store the RDF statements in {cur_file_path}. {e}")
307 def _build_ns_to_prefix(self, context_path: str) -> list[tuple[str, str]]:
308 ctx = self.context_map[context_path]
309 if isinstance(ctx, dict) and "@context" in ctx:
310 ctx = ctx["@context"]
311 pairs = [
312 (ns, prefix) for prefix, ns in ctx.items()
313 if isinstance(ns, str) and not prefix.startswith("@")
314 ]
315 pairs.sort(key=lambda x: len(x[0]), reverse=True)
316 return pairs
318 def _write_jsonld_fast(self, json_bytes: bytes, relevant_path: str) -> None:
319 if self.zip_output:
320 zip_file_path = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip")
321 with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zf:
322 zf.writestr(os.path.basename(relevant_path), json_bytes)
323 else:
324 with open(relevant_path, 'wb') as f:
325 f.write(json_bytes)
326 self.repok.add_sentence(f"File '{relevant_path}' added.")
328 def _store_all_jsonld_fast(self, relevant_paths: Dict[str, list], context_path: str | None) -> List[str]:
329 reader = Reader(context_map=self.context_map)
330 ns_to_prefix: list[tuple[str, str]] | None = None
331 if context_path is not None and context_path in self.context_map:
332 ns_to_prefix = self._build_ns_to_prefix(context_path)
334 for relevant_path, entities_in_path in relevant_paths.items():
335 output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path
336 lock = FileLock(f"{output_filepath}.lock")
337 with lock:
338 existing_data: list[dict] | None = None
339 if os.path.exists(output_filepath):
340 existing_data = reader.load_jsonld_dict(output_filepath)
341 doc = _JsonLdDoc(existing_data if existing_data is not None else [])
343 for entity in entities_in_path:
344 graph_iri = entity.g.identifier
345 if isinstance(entity, ProvEntity):
346 doc.merge_entity(graph_iri, entity.res, _entity_to_jsonld_dict(entity))
347 elif isinstance(entity, (GraphEntity, MetadataEntity)):
348 if entity.to_be_deleted:
349 doc.remove_entity(graph_iri, entity.res)
350 else:
351 if len(entity._preexisting_triples) > 0:
352 doc.remove_entity(graph_iri, entity.res)
353 doc.upsert_entity(graph_iri, entity.res, _entity_to_jsonld_dict(entity))
355 output_data: list[dict] | dict = doc.to_list()
356 if context_path is not None and ns_to_prefix is not None:
357 output_data = _compact_jsonld(output_data, context_path, ns_to_prefix)
358 json_bytes = orjson.dumps(output_data)
359 self._write_jsonld_fast(json_bytes, relevant_path)
361 return list(relevant_paths.keys())
363 def _store_graphs_in_file_jsonld_fast(self, file_path: str, context_path: str | None) -> None:
364 doc = _JsonLdDoc([])
365 for entity in self.a_set.res_to_entity.values():
366 if len(entity.g) > 0:
367 graph_iri = entity.g.identifier
368 doc.upsert_entity(graph_iri, entity.res, _entity_to_jsonld_dict(entity))
370 output_data: list[dict] | dict = doc.to_list()
371 if context_path is not None and context_path in self.context_map:
372 ns_to_prefix = self._build_ns_to_prefix(context_path)
373 output_data = _compact_jsonld(output_data, context_path, ns_to_prefix)
374 json_bytes = orjson.dumps(output_data)
375 self._write_jsonld_fast(json_bytes, file_path)
377 def upload_and_store(self, base_dir: str, triplestore_url: str, base_iri: str, context_path: str | None = None,
378 batch_size: int = 10) -> None:
379 stored_graph_path: List[str] = self.store_all(base_dir, base_iri, context_path)
381 # If some graphs were not stored properly, then no one will be uploaded to the triplestore
382 # Anyway, we should highlight those ones that could have been added in principle, by
383 # mentioning them with a ".notuploaded" marker
384 if None in stored_graph_path:
385 for file_path in stored_graph_path:
386 if file_path is not None:
387 # Create a marker for the file not uploaded in the triplestore
388 open(f'{file_path}.notuploaded', 'wt', encoding='utf-8').close()
389 self.reperr.add_sentence("[2] "
390 f"The statements contained in the JSON-LD file '{file_path}' "
391 "were not uploaded into the triplestore.")
392 else: # All the files have been stored
393 self.upload_all(triplestore_url, base_dir, batch_size)
395 def _dir_and_file_paths(self, res: URIRef, base_dir: str, base_iri: str, process_id: int | str | None = None) -> Tuple[str, str]:
396 is_json: bool = (self.output_format == "json-ld")
397 return find_paths(res, base_dir, base_iri, self.default_dir, self.dir_split, self.n_file_item, is_json=is_json, process_id=process_id)
399 @staticmethod
400 def _class_to_entity_type(entity: AbstractEntity) -> str:
401 if isinstance(entity, GraphEntity):
402 return "graph"
403 elif isinstance(entity, ProvEntity):
404 return "prov"
405 else:
406 return "metadata"
408 def upload_all(self, triplestore_url: str, base_dir: str | None = None, batch_size: int = 10,
409 save_queries: bool = False) -> bool:
410 """
411 Upload SPARQL update queries to the triplestore in batches, or save them to disk.
413 Args:
414 triplestore_url: SPARQL endpoint URL
415 base_dir: Base directory for output files (required when save_queries is True)
416 batch_size: Number of queries per SPARQL batch
417 save_queries: If True, save combined SPARQL queries to disk instead of uploading
419 Returns:
420 True if all batches were processed successfully, False otherwise
421 """
422 self.repok.new_article()
423 self.reperr.new_article()
425 if batch_size <= 0:
426 batch_size = 10
428 query_batch: list = []
429 added_statements: int = 0
430 removed_statements: int = 0
431 result: bool = True
432 to_be_uploaded_dir: str = ""
434 if base_dir:
435 to_be_uploaded_dir = os.path.join(base_dir, "to_be_uploaded")
436 os.makedirs(to_be_uploaded_dir, exist_ok=True)
438 entities_to_process = self.a_set.res_to_entity.values()
439 if self.modified_entities is not None:
440 entities_to_process = [
441 entity for entity in entities_to_process
442 if str(entity.res).split('/prov/se/')[0] in self.modified_entities
443 ]
445 for entity in entities_to_process:
446 entity_type = self._class_to_entity_type(entity)
447 update_queries, n_added, n_removed = get_update_query(entity, entity_type=entity_type)
449 if not update_queries:
450 continue
452 for query in update_queries:
453 query_batch.append(query)
454 added_statements += n_added // len(update_queries)
455 removed_statements += n_removed // len(update_queries)
457 if len(query_batch) >= batch_size:
458 query_string = " ; ".join(query_batch)
459 if save_queries:
460 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements)
461 else:
462 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
463 query_batch = []
464 added_statements = 0
465 removed_statements = 0
467 if query_batch:
468 query_string = " ; ".join(query_batch)
469 if save_queries:
470 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements)
471 else:
472 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
474 return result
476 def _save_query(self, query_string: str, directory: str, added_statements: int, removed_statements: int) -> None:
477 content_hash = hashlib.sha256(query_string.encode('utf-8')).hexdigest()[:16]
478 file_name = f"{content_hash}_add{added_statements}_remove{removed_statements}.sparql"
479 file_path = os.path.join(directory, file_name)
480 with open(file_path, 'w', encoding='utf-8') as f:
481 f.write(query_string)
483 def upload(self, entity: AbstractEntity, triplestore_url: str, base_dir: str | None = None) -> bool:
484 self.repok.new_article()
485 self.reperr.new_article()
487 entity_type = self._class_to_entity_type(entity)
488 update_queries, n_added, n_removed = get_update_query(entity, entity_type=entity_type)
489 query_string = " ; ".join(update_queries) if update_queries else ""
490 return self._query(query_string, triplestore_url, base_dir, n_added, n_removed)
492 def execute_query(self, query_string: str, triplestore_url: str) -> bool:
493 self.repok.new_article()
494 self.reperr.new_article()
496 return self._query(query_string, triplestore_url)
498 def _query(self, query_string: str, triplestore_url: str, base_dir: str | None = None,
499 added_statements: int = 0, removed_statements: int = 0) -> bool:
500 if query_string != "":
501 try:
502 sparql_update(triplestore_url, query_string, max_retries=3, backoff_factor=2.5)
504 self.repok.add_sentence(
505 f"Triplestore updated with {added_statements} added statements and "
506 f"with {removed_statements} removed statements.")
508 return True
510 except SPARQLEndpointError as e:
511 self.reperr.add_sentence("[3] "
512 "Graph was not loaded into the "
513 f"triplestore due to communication problems: {e}")
514 if base_dir is not None:
515 tp_err_dir: str = base_dir + os.sep + "tp_err"
516 if not os.path.exists(tp_err_dir):
517 os.makedirs(tp_err_dir, exist_ok=True)
518 cur_file_err: str = tp_err_dir + os.sep + \
519 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt')
520 with open(cur_file_err, 'wt', encoding='utf-8') as f:
521 f.write(query_string)
523 return False