Coverage for oc_ocdm / storer.py: 85%
228 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2022-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8# -*- coding: utf-8 -*-
9from __future__ import annotations
11import hashlib
12import json
13import os
14from datetime import datetime
15from typing import TYPE_CHECKING
16from zipfile import ZIP_DEFLATED, ZipFile
18from filelock import FileLock
20from oc_ocdm.graph.graph_entity import GraphEntity
21from oc_ocdm.metadata.metadata_entity import MetadataEntity
22from oc_ocdm.prov.prov_entity import ProvEntity
23from oc_ocdm.reader import Reader
24from oc_ocdm.support.query_utils import get_update_query
25from oc_ocdm.support.reporter import Reporter
26from oc_ocdm.support.support import find_paths
27from rdflib import Dataset, URIRef
28from sparqlite import SPARQLClient, EndpointError
30if TYPE_CHECKING:
31 from typing import Any, Dict, List, Set, Tuple
33 from oc_ocdm.abstract_entity import AbstractEntity
34 from oc_ocdm.abstract_set import AbstractSet
37class Storer(object):
39 def __init__(self, abstract_set: AbstractSet, repok: Reporter | None = None, reperr: Reporter | None = None,
40 context_map: Dict[str, Any] | None = None, default_dir: str = "_", dir_split: int = 0,
41 n_file_item: int = 1, output_format: str = "json-ld", zip_output: bool = False, modified_entities: set | None = None) -> None:
42 # We only accept format strings that:
43 # 1. are supported by rdflib
44 # 2. correspond to an output format which is effectively either NT or NQ
45 # The only exception to this rule is the 'json-ld' format, which is the default value of 'output_format'.
46 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11',
47 'application/n-quads', 'nquads', 'json-ld'}
48 if output_format not in supported_formats:
49 raise ValueError(f"Given output_format '{output_format}' is not supported."
50 f" Available formats: {supported_formats}.")
51 else:
52 self.output_format: str = output_format
53 self.zip_output = zip_output
54 self.dir_split: int = dir_split
55 self.n_file_item: int = n_file_item
56 self.default_dir: str = default_dir if default_dir != "" else "_"
57 self.a_set: AbstractSet = abstract_set
58 self.modified_entities = modified_entities
60 if context_map is not None:
61 self.context_map: Dict[str, Any] = context_map
62 else:
63 self.context_map: Dict[str, Any] = {}
65 if self.output_format == "json-ld":
66 for context_url in self.context_map:
67 ctx_file_path: Any = self.context_map[context_url]
68 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path):
69 # This expensive operation is done only when it's really needed
70 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f:
71 self.context_map[context_url] = json.load(ctx_f)
73 if repok is None:
74 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ")
75 else:
76 self.repok: Reporter = repok
78 if reperr is None:
79 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ")
80 else:
81 self.reperr: Reporter = reperr
83 def store_graphs_in_file(self, file_path: str, context_path: str | None = None) -> None:
84 self.repok.new_article()
85 self.reperr.new_article()
86 self.repok.add_sentence("Store the graphs into a file: starting process")
88 cg: Dataset = Dataset()
89 for g in self.a_set.graphs():
90 cg.addN(item + (g.identifier,) for item in g) # type: ignore[arg-type]
92 self._store_in_file(cg, file_path, context_path)
94 def _store_in_file(self, cur_g: Dataset, cur_file_path: str, context_path: str | None = None) -> None:
95 zip_file_path = cur_file_path.replace(os.path.splitext(cur_file_path)[1], ".zip")
97 if self.zip_output:
98 with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file:
99 self._write_graph(cur_g, zip_file, cur_file_path, context_path)
100 else:
101 self._write_graph(cur_g, None, cur_file_path, context_path)
103 self.repok.add_sentence(f"File '{cur_file_path}' added.")
105 def _write_graph(self, graph: Dataset, zip_file: ZipFile | None, cur_file_path: str, context_path: str | None) -> None:
106 if self.output_format == "json-ld":
107 if context_path is not None and context_path in self.context_map:
108 cur_json_ld = json.loads(graph.serialize(format="json-ld", context=self.context_map[context_path]))
109 if isinstance(cur_json_ld, dict):
110 cur_json_ld["@context"] = context_path
111 else:
112 for item in cur_json_ld:
113 item["@context"] = context_path
114 if zip_file is not None:
115 data = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8')
116 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data)
117 else:
118 with open(cur_file_path, 'wt', encoding='utf-8') as f:
119 json.dump(cur_json_ld, f, ensure_ascii=False)
120 else:
121 if zip_file is not None:
122 data = graph.serialize(format="json-ld").encode('utf-8')
123 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data)
124 else:
125 graph.serialize(destination=cur_file_path, format="json-ld")
126 else:
127 # Handle other RDF formats
128 if zip_file is not None:
129 rdf_serialization = graph.serialize(destination=None, format=self.output_format, encoding="utf-8")
130 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization)
131 else:
132 graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8")
134 def store_all(self, base_dir: str, base_iri: str, context_path: str | None = None, process_id: int | str | None = None) -> List[str]:
135 self.repok.new_article()
136 self.reperr.new_article()
138 self.repok.add_sentence("Starting the process")
140 relevant_paths: Dict[str, list] = dict()
141 created_dirs = set()
142 for entity in self.a_set.res_to_entity.values():
143 is_relevant = True
144 if self.modified_entities is not None and URIRef(entity.res.split('/prov/se/')[0]) not in self.modified_entities:
145 is_relevant = False
146 if is_relevant:
147 cur_dir_path, cur_file_path = self._dir_and_file_paths(entity.res, base_dir, base_iri, process_id)
148 if cur_dir_path not in created_dirs:
149 os.makedirs(cur_dir_path, exist_ok=True)
150 created_dirs.add(cur_dir_path)
151 relevant_paths.setdefault(cur_file_path, list())
152 relevant_paths[cur_file_path].append(entity)
154 for relevant_path, entities_in_path in relevant_paths.items():
155 stored_g = None
156 # Here we try to obtain a reference to the currently stored graph
157 output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path
158 lock = FileLock(f"{output_filepath}.lock")
159 with lock:
160 if os.path.exists(output_filepath):
161 stored_g = Reader(context_map=self.context_map).load(output_filepath)
162 if stored_g is None:
163 stored_g = Dataset()
164 for entity_in_path in entities_in_path:
165 self.store(entity_in_path, stored_g, relevant_path, context_path, False)
166 self._store_in_file(stored_g, relevant_path, context_path)
168 return list(relevant_paths.keys())
170 def store(self, entity: AbstractEntity, destination_g: Dataset, cur_file_path: str, context_path: str | None = None, store_now: bool = True) -> Dataset | None:
171 self.repok.new_article()
172 self.reperr.new_article()
174 try:
175 if isinstance(entity, ProvEntity):
176 quads: List[Tuple] = []
177 graph_identifier = URIRef(str(entity.g.identifier))
178 for triple in entity.g.triples((entity.res, None, None)):
179 quads.append((*triple, graph_identifier))
180 destination_g.addN(quads)
181 elif isinstance(entity, GraphEntity) or isinstance(entity, MetadataEntity):
182 if entity.to_be_deleted:
183 destination_g.remove((entity.res, None, None, None)) # type: ignore[arg-type]
184 else:
185 if len(entity.preexisting_graph) > 0:
186 """
187 We're not in 'append mode', so we need to remove
188 the entity that we're going to overwrite.
189 """
190 destination_g.remove((entity.res, None, None, None)) # type: ignore[arg-type]
191 """
192 Here we copy data from the entity into the stored graph.
193 If the entity was marked as to be deleted, then we're
194 done because we already removed all of its triples.
195 """
196 quads: List[Tuple] = []
197 graph_identifier = URIRef(str(entity.g.identifier))
198 for triple in entity.g.triples((entity.res, None, None)):
199 quads.append((*triple, graph_identifier))
200 destination_g.addN(quads)
202 if store_now:
203 self._store_in_file(destination_g, cur_file_path, context_path)
205 return destination_g
206 except Exception as e:
207 self.reperr.add_sentence(f"[1] It was impossible to store the RDF statements in {cur_file_path}. {e}")
209 def upload_and_store(self, base_dir: str, triplestore_url: str, base_iri: str, context_path: str | None = None,
210 batch_size: int = 10) -> None:
211 stored_graph_path: List[str] = self.store_all(base_dir, base_iri, context_path)
213 # If some graphs were not stored properly, then no one will be uploaded to the triplestore
214 # Anyway, we should highlight those ones that could have been added in principle, by
215 # mentioning them with a ".notuploaded" marker
216 if None in stored_graph_path:
217 for file_path in stored_graph_path:
218 if file_path is not None:
219 # Create a marker for the file not uploaded in the triplestore
220 open(f'{file_path}.notuploaded', 'wt', encoding='utf-8').close()
221 self.reperr.add_sentence("[2] "
222 f"The statements contained in the JSON-LD file '{file_path}' "
223 "were not uploaded into the triplestore.")
224 else: # All the files have been stored
225 self.upload_all(triplestore_url, base_dir, batch_size)
227 def _dir_and_file_paths(self, res: URIRef, base_dir: str, base_iri: str, process_id: int | str | None = None) -> Tuple[str, str]:
228 is_json: bool = (self.output_format == "json-ld")
229 return find_paths(res, base_dir, base_iri, self.default_dir, self.dir_split, self.n_file_item, is_json=is_json, process_id=process_id)
231 @staticmethod
232 def _class_to_entity_type(entity: AbstractEntity) -> str:
233 if isinstance(entity, GraphEntity):
234 return "graph"
235 elif isinstance(entity, ProvEntity):
236 return "prov"
237 else:
238 return "metadata"
240 def upload_all(self, triplestore_url: str, base_dir: str | None = None, batch_size: int = 10,
241 save_queries: bool = False) -> bool:
242 """
243 Upload SPARQL update queries to the triplestore in batches, or save them to disk.
245 Args:
246 triplestore_url: SPARQL endpoint URL
247 base_dir: Base directory for output files (required when save_queries is True)
248 batch_size: Number of queries per SPARQL batch
249 save_queries: If True, save combined SPARQL queries to disk instead of uploading
251 Returns:
252 True if all batches were processed successfully, False otherwise
253 """
254 self.repok.new_article()
255 self.reperr.new_article()
257 if batch_size <= 0:
258 batch_size = 10
260 query_batch: list = []
261 added_statements: int = 0
262 removed_statements: int = 0
263 result: bool = True
264 to_be_uploaded_dir: str = ""
266 if base_dir:
267 to_be_uploaded_dir = os.path.join(base_dir, "to_be_uploaded")
268 os.makedirs(to_be_uploaded_dir, exist_ok=True)
270 entities_to_process = self.a_set.res_to_entity.values()
271 if self.modified_entities is not None:
272 entities_to_process = [
273 entity for entity in entities_to_process
274 if URIRef(str(entity.res).split('/prov/se/')[0]) in self.modified_entities
275 ]
277 for entity in entities_to_process:
278 entity_type = self._class_to_entity_type(entity)
279 update_queries, n_added, n_removed = get_update_query(entity, entity_type=entity_type)
281 if not update_queries:
282 continue
284 for query in update_queries:
285 query_batch.append(query)
286 added_statements += n_added // len(update_queries)
287 removed_statements += n_removed // len(update_queries)
289 if len(query_batch) >= batch_size:
290 query_string = " ; ".join(query_batch)
291 if save_queries:
292 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements)
293 else:
294 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
295 query_batch = []
296 added_statements = 0
297 removed_statements = 0
299 if query_batch:
300 query_string = " ; ".join(query_batch)
301 if save_queries:
302 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements)
303 else:
304 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
306 return result
308 def _save_query(self, query_string: str, directory: str, added_statements: int, removed_statements: int) -> None:
309 content_hash = hashlib.sha256(query_string.encode('utf-8')).hexdigest()[:16]
310 file_name = f"{content_hash}_add{added_statements}_remove{removed_statements}.sparql"
311 file_path = os.path.join(directory, file_name)
312 with open(file_path, 'w', encoding='utf-8') as f:
313 f.write(query_string)
315 def upload(self, entity: AbstractEntity, triplestore_url: str, base_dir: str | None = None) -> bool:
316 self.repok.new_article()
317 self.reperr.new_article()
319 entity_type = self._class_to_entity_type(entity)
320 update_queries, n_added, n_removed = get_update_query(entity, entity_type=entity_type)
321 query_string = " ; ".join(update_queries) if update_queries else ""
322 return self._query(query_string, triplestore_url, base_dir, n_added, n_removed)
324 def execute_query(self, query_string: str, triplestore_url: str) -> bool:
325 self.repok.new_article()
326 self.reperr.new_article()
328 return self._query(query_string, triplestore_url)
330 def _query(self, query_string: str, triplestore_url: str, base_dir: str | None = None,
331 added_statements: int = 0, removed_statements: int = 0) -> bool:
332 if query_string != "":
333 try:
334 with SPARQLClient(triplestore_url, max_retries=3, backoff_factor=2.5) as client:
335 client.update(query_string)
337 self.repok.add_sentence(
338 f"Triplestore updated with {added_statements} added statements and "
339 f"with {removed_statements} removed statements.")
341 return True
343 except EndpointError as e:
344 self.reperr.add_sentence("[3] "
345 "Graph was not loaded into the "
346 f"triplestore due to communication problems: {e}")
347 if base_dir is not None:
348 tp_err_dir: str = base_dir + os.sep + "tp_err"
349 if not os.path.exists(tp_err_dir):
350 os.makedirs(tp_err_dir, exist_ok=True)
351 cur_file_err: str = tp_err_dir + os.sep + \
352 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt')
353 with open(cur_file_err, 'wt', encoding='utf-8') as f:
354 f.write(query_string)
356 return False