Coverage for oc_ocdm/storer.py: 82%
272 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-05 23:58 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-05 23:58 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
16from __future__ import annotations
18import gzip
19import hashlib
20import json
21import os
22from datetime import datetime
23from typing import TYPE_CHECKING
24from zipfile import ZIP_DEFLATED, ZipFile
26from filelock import FileLock
28from oc_ocdm.graph.graph_entity import GraphEntity
29from oc_ocdm.metadata.metadata_entity import MetadataEntity
30from oc_ocdm.prov.prov_entity import ProvEntity
31from oc_ocdm.reader import Reader
32from oc_ocdm.support.query_utils import get_separated_queries, get_update_query, serialize_graph_to_nquads
33from oc_ocdm.support.reporter import Reporter
34from oc_ocdm.support.support import find_paths
35from rdflib import Dataset, URIRef
36from sparqlite import SPARQLClient, EndpointError
38if TYPE_CHECKING:
39 from typing import Any, Dict, List, Optional, Set, Tuple
41 from oc_ocdm.abstract_entity import AbstractEntity
42 from oc_ocdm.abstract_set import AbstractSet
45class Storer(object):
47 def __init__(self, abstract_set: AbstractSet, repok: Reporter = None, reperr: Reporter = None,
48 context_map: Dict[str, Any] = None, default_dir: str = "_", dir_split: int = 0,
49 n_file_item: int = 1, output_format: str = "json-ld", zip_output: bool = False, modified_entities: set = None) -> None:
50 # We only accept format strings that:
51 # 1. are supported by rdflib
52 # 2. correspond to an output format which is effectively either NT or NQ
53 # The only exception to this rule is the 'json-ld' format, which is the default value of 'output_format'.
54 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11',
55 'application/n-quads', 'nquads', 'json-ld'}
56 if output_format not in supported_formats:
57 raise ValueError(f"Given output_format '{output_format}' is not supported."
58 f" Available formats: {supported_formats}.")
59 else:
60 self.output_format: str = output_format
61 self.zip_output = zip_output
62 self.dir_split: int = dir_split
63 self.n_file_item: int = n_file_item
64 self.default_dir: str = default_dir if default_dir != "" else "_"
65 self.a_set: AbstractSet = abstract_set
66 self.modified_entities = modified_entities
68 if context_map is not None:
69 self.context_map: Dict[str, Any] = context_map
70 else:
71 self.context_map: Dict[str, Any] = {}
73 if self.output_format == "json-ld":
74 for context_url in self.context_map:
75 ctx_file_path: Any = self.context_map[context_url]
76 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path):
77 # This expensive operation is done only when it's really needed
78 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f:
79 self.context_map[context_url] = json.load(ctx_f)
81 if repok is None:
82 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ")
83 else:
84 self.repok: Reporter = repok
86 if reperr is None:
87 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ")
88 else:
89 self.reperr: Reporter = reperr
91 def store_graphs_in_file(self, file_path: str, context_path: str = None) -> None:
92 self.repok.new_article()
93 self.reperr.new_article()
94 self.repok.add_sentence("Store the graphs into a file: starting process")
96 cg: Dataset = Dataset()
97 for g in self.a_set.graphs():
98 cg.addN(item + (g.identifier,) for item in g)
100 self._store_in_file(cg, file_path, context_path)
102 def _store_in_file(self, cur_g: Dataset, cur_file_path: str, context_path: str = None) -> None:
103 zip_file_path = cur_file_path.replace(os.path.splitext(cur_file_path)[1], ".zip")
105 if self.zip_output:
106 with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file:
107 self._write_graph(cur_g, zip_file, cur_file_path, context_path)
108 else:
109 self._write_graph(cur_g, None, cur_file_path, context_path)
111 self.repok.add_sentence(f"File '{cur_file_path}' added.")
113 def _write_graph(self, graph: Dataset, zip_file: ZipFile, cur_file_path, context_path):
114 if self.output_format == "json-ld":
115 if context_path is not None and context_path in self.context_map:
116 cur_json_ld = json.loads(graph.serialize(format="json-ld", context=self.context_map[context_path]))
117 if isinstance(cur_json_ld, dict):
118 cur_json_ld["@context"] = context_path
119 else:
120 for item in cur_json_ld:
121 item["@context"] = context_path
122 if zip_file is not None:
123 data = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8')
124 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data)
125 else:
126 with open(cur_file_path, 'wt', encoding='utf-8') as f:
127 json.dump(cur_json_ld, f, ensure_ascii=False)
128 else:
129 if zip_file is not None:
130 data = graph.serialize(format="json-ld").encode('utf-8')
131 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=data)
132 else:
133 graph.serialize(destination=cur_file_path, format="json-ld")
134 else:
135 # Handle other RDF formats
136 if zip_file is not None:
137 rdf_serialization = graph.serialize(destination=None, format=self.output_format, encoding="utf-8")
138 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization)
139 else:
140 graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8")
142 def store_all(self, base_dir: str, base_iri: str, context_path: str = None, process_id: int|str = None) -> List[str]:
143 self.repok.new_article()
144 self.reperr.new_article()
146 self.repok.add_sentence("Starting the process")
148 relevant_paths: Dict[str, list] = dict()
149 created_dirs = set()
150 for entity in self.a_set.res_to_entity.values():
151 is_relevant = True
152 if self.modified_entities is not None and URIRef(entity.res.split('/prov/se/')[0]) not in self.modified_entities:
153 is_relevant = False
154 if is_relevant:
155 cur_dir_path, cur_file_path = self._dir_and_file_paths(entity.res, base_dir, base_iri, process_id)
156 if cur_dir_path not in created_dirs:
157 os.makedirs(cur_dir_path, exist_ok=True)
158 created_dirs.add(cur_dir_path)
159 relevant_paths.setdefault(cur_file_path, list())
160 relevant_paths[cur_file_path].append(entity)
162 for relevant_path, entities_in_path in relevant_paths.items():
163 stored_g = None
164 # Here we try to obtain a reference to the currently stored graph
165 output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path
166 lock = FileLock(f"{output_filepath}.lock")
167 with lock:
168 if os.path.exists(output_filepath):
169 stored_g = Reader(context_map=self.context_map).load(output_filepath)
170 if stored_g is None:
171 stored_g = Dataset()
172 for entity_in_path in entities_in_path:
173 self.store(entity_in_path, stored_g, relevant_path, context_path, False)
174 self._store_in_file(stored_g, relevant_path, context_path)
176 return list(relevant_paths.keys())
178 def store(self, entity: AbstractEntity, destination_g: Dataset, cur_file_path: str, context_path: str = None, store_now: bool = True) -> Dataset:
179 self.repok.new_article()
180 self.reperr.new_article()
182 try:
183 if isinstance(entity, ProvEntity):
184 quads: List[Tuple] = []
185 graph_identifier: URIRef = entity.g.identifier
186 for triple in entity.g.triples((entity.res, None, None)):
187 quads.append((*triple, graph_identifier))
188 destination_g.addN(quads)
189 elif isinstance(entity, GraphEntity) or isinstance(entity, MetadataEntity):
190 if entity.to_be_deleted:
191 destination_g.remove((entity.res, None, None, None))
192 else:
193 if len(entity.preexisting_graph) > 0:
194 """
195 We're not in 'append mode', so we need to remove
196 the entity that we're going to overwrite.
197 """
198 destination_g.remove((entity.res, None, None, None))
199 """
200 Here we copy data from the entity into the stored graph.
201 If the entity was marked as to be deleted, then we're
202 done because we already removed all of its triples.
203 """
204 quads: List[Tuple] = []
205 graph_identifier: URIRef = entity.g.identifier
206 for triple in entity.g.triples((entity.res, None, None)):
207 quads.append((*triple, graph_identifier))
208 destination_g.addN(quads)
210 if store_now:
211 self._store_in_file(destination_g, cur_file_path, context_path)
213 return destination_g
214 except Exception as e:
215 self.reperr.add_sentence(f"[1] It was impossible to store the RDF statements in {cur_file_path}. {e}")
217 def upload_and_store(self, base_dir: str, triplestore_url: str, base_iri: str, context_path: str = None,
218 batch_size: int = 10) -> None:
219 stored_graph_path: List[str] = self.store_all(base_dir, base_iri, context_path)
221 # If some graphs were not stored properly, then no one will be uploaded to the triplestore
222 # Anyway, we should highlight those ones that could have been added in principle, by
223 # mentioning them with a ".notuploaded" marker
224 if None in stored_graph_path:
225 for file_path in stored_graph_path:
226 if file_path is not None:
227 # Create a marker for the file not uploaded in the triplestore
228 open(f'{file_path}.notuploaded', 'wt', encoding='utf-8').close()
229 self.reperr.add_sentence("[2] "
230 f"The statements contained in the JSON-LD file '{file_path}' "
231 "were not uploaded into the triplestore.")
232 else: # All the files have been stored
233 self.upload_all(triplestore_url, base_dir, batch_size)
235 def _dir_and_file_paths(self, res: URIRef, base_dir: str, base_iri: str, process_id: int|str = None) -> Tuple[str, str]:
236 is_json: bool = (self.output_format == "json-ld")
237 return find_paths(res, base_dir, base_iri, self.default_dir, self.dir_split, self.n_file_item, is_json=is_json, process_id=process_id)
239 @staticmethod
240 def _class_to_entity_type(entity: AbstractEntity) -> Optional[str]:
241 if isinstance(entity, GraphEntity):
242 return "graph"
243 elif isinstance(entity, ProvEntity):
244 return "prov"
245 elif isinstance(entity, MetadataEntity):
246 return "metadata"
247 else:
248 return None
250 def upload_all(self, triplestore_url: str, base_dir: str = None, batch_size: int = 10,
251 save_queries: bool = False, prepare_bulk_load: bool = False,
252 bulk_load_dir: str = None) -> bool:
253 """
254 Upload queries to triplestore or save them to disk.
256 Three usage modes:
257 1. Default (save_queries=False, prepare_bulk_load=False): Execute combined SPARQL queries on triplestore
258 2. Save queries (save_queries=True, prepare_bulk_load=False): Save combined SPARQL queries to disk
259 3. Bulk load (save_queries=False, prepare_bulk_load=True): Prepare data for Virtuoso bulk loader (nquads + delete queries)
261 Args:
262 triplestore_url: SPARQL endpoint URL
263 base_dir: Base directory for output files
264 batch_size: Number of queries per SPARQL batch
265 save_queries: If True, save combined SPARQL queries to disk instead of uploading
266 prepare_bulk_load: If True, prepare data for Virtuoso bulk loader (separate nquads + delete queries)
267 bulk_load_dir: Directory for nquads files (required if prepare_bulk_load=True)
269 Returns:
270 True if successful, False otherwise
271 """
272 self.repok.new_article()
273 self.reperr.new_article()
275 if save_queries and prepare_bulk_load:
276 raise ValueError("save_queries and prepare_bulk_load are mutually exclusive")
278 if prepare_bulk_load and not bulk_load_dir:
279 raise ValueError("bulk_load_dir is required when prepare_bulk_load=True")
281 if batch_size <= 0:
282 batch_size = 10
284 query_batch: list = []
285 added_statements: int = 0
286 removed_statements: int = 0
287 result: bool = True
289 if base_dir:
290 to_be_uploaded_dir = os.path.join(base_dir, "to_be_uploaded")
291 os.makedirs(to_be_uploaded_dir, exist_ok=True)
293 if prepare_bulk_load:
294 os.makedirs(bulk_load_dir, exist_ok=True)
295 nquads_buffer: list = []
296 nquads_count: int = 0
297 nquads_file_index: int = 0
298 nquads_batch_size: int = 1000000
300 entities_to_process = self.a_set.res_to_entity.values()
301 if self.modified_entities is not None:
302 entities_to_process = [
303 entity for entity in entities_to_process
304 if URIRef(str(entity.res).split('/prov/se/')[0]) in self.modified_entities
305 ]
307 for entity in entities_to_process:
308 entity_type = self._class_to_entity_type(entity)
310 if prepare_bulk_load:
311 insert_queries, delete_queries, n_added, n_removed, insert_graph = get_separated_queries(entity, entity_type=entity_type)
313 if not insert_queries and not delete_queries:
314 continue
316 if insert_queries:
317 quads = serialize_graph_to_nquads(insert_graph, entity.g.identifier)
318 nquads_buffer.extend(quads)
319 nquads_count += len(quads)
321 if nquads_count >= nquads_batch_size:
322 self._write_nquads_file(nquads_buffer, bulk_load_dir, nquads_file_index)
323 nquads_file_index += 1
324 nquads_buffer = []
325 nquads_count = 0
327 for delete_query in delete_queries:
328 query_batch.append(delete_query)
329 removed_statements += n_removed // len(delete_queries)
331 if len(query_batch) >= batch_size:
332 query_string = " ; ".join(query_batch)
333 self._save_query(query_string, to_be_uploaded_dir, 0, removed_statements)
334 query_batch = []
335 removed_statements = 0
336 else:
337 update_queries, n_added, n_removed = get_update_query(entity, entity_type=entity_type)
339 if not update_queries:
340 continue
342 for query in update_queries:
343 query_batch.append(query)
344 added_statements += n_added // len(update_queries)
345 removed_statements += n_removed // len(update_queries)
347 if len(query_batch) >= batch_size:
348 query_string = " ; ".join(query_batch)
349 if save_queries:
350 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements)
351 else:
352 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
353 query_batch = []
354 added_statements = 0
355 removed_statements = 0
357 if query_batch:
358 query_string = " ; ".join(query_batch)
359 if prepare_bulk_load:
360 self._save_query(query_string, to_be_uploaded_dir, 0, removed_statements)
361 elif save_queries:
362 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements)
363 else:
364 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
366 if prepare_bulk_load and nquads_buffer:
367 self._write_nquads_file(nquads_buffer, bulk_load_dir, nquads_file_index)
369 return result
371 def _write_nquads_file(self, nquads: list, output_dir: str, file_index: int) -> None:
372 """
373 Writes N-Quads to a gzipped file.
375 Args:
376 nquads: List of N-Quad strings
377 output_dir: Output directory
378 file_index: File index for naming
379 """
380 filename = f"bulk_load_{file_index:05d}.nq.gz"
381 filepath = os.path.join(output_dir, filename)
383 with gzip.open(filepath, 'wt', encoding='utf-8') as f:
384 f.writelines(nquads)
386 self.repok.add_sentence(f"Written {len(nquads)} quads to {filename}")
388 def _save_query(self, query_string: str, directory: str, added_statements: int, removed_statements: int) -> None:
389 content_hash = hashlib.sha256(query_string.encode('utf-8')).hexdigest()[:16]
390 file_name = f"{content_hash}_add{added_statements}_remove{removed_statements}.sparql"
391 file_path = os.path.join(directory, file_name)
392 with open(file_path, 'w', encoding='utf-8') as f:
393 f.write(query_string)
395 def upload(self, entity: AbstractEntity, triplestore_url: str, base_dir: str = None) -> bool:
396 self.repok.new_article()
397 self.reperr.new_article()
399 update_query, n_added, n_removed = get_update_query(entity, entity_type=self._class_to_entity_type(entity))
401 return self._query(update_query, triplestore_url, base_dir, n_added, n_removed)
403 def execute_query(self, query_string: str, triplestore_url: str) -> bool:
404 self.repok.new_article()
405 self.reperr.new_article()
407 return self._query(query_string, triplestore_url)
409 def _query(self, query_string: str, triplestore_url: str, base_dir: str = None,
410 added_statements: int = 0, removed_statements: int = 0) -> bool:
411 if query_string != "":
412 try:
413 with SPARQLClient(triplestore_url, max_retries=3, backoff_factor=2.5) as client:
414 client.update(query_string)
416 self.repok.add_sentence(
417 f"Triplestore updated with {added_statements} added statements and "
418 f"with {removed_statements} removed statements.")
420 return True
422 except EndpointError as e:
423 self.reperr.add_sentence("[3] "
424 "Graph was not loaded into the "
425 f"triplestore due to communication problems: {e}")
426 if base_dir is not None:
427 tp_err_dir: str = base_dir + os.sep + "tp_err"
428 if not os.path.exists(tp_err_dir):
429 os.makedirs(tp_err_dir, exist_ok=True)
430 cur_file_err: str = tp_err_dir + os.sep + \
431 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt')
432 with open(cur_file_err, 'wt', encoding='utf-8') as f:
433 f.write(query_string)
435 return False