Coverage for oc_ocdm/storer.py: 72%
242 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
16from __future__ import annotations
18import json
19import os
20import time
21from datetime import datetime
22from typing import TYPE_CHECKING
23from zipfile import ZIP_DEFLATED, ZipFile
25from filelock import FileLock
26from oc_ocdm.graph.graph_entity import GraphEntity
27from oc_ocdm.metadata.metadata_entity import MetadataEntity
28from oc_ocdm.prov.prov_entity import ProvEntity
29from oc_ocdm.reader import Reader
30from oc_ocdm.support.query_utils import get_update_query
31from oc_ocdm.support.reporter import Reporter
32from oc_ocdm.support.support import find_paths
33from rdflib import ConjunctiveGraph, URIRef
34from SPARQLWrapper import SPARQLWrapper
36if TYPE_CHECKING:
37 from typing import Any, Dict, List, Optional, Set, Tuple
39 from oc_ocdm.abstract_entity import AbstractEntity
40 from oc_ocdm.abstract_set import AbstractSet
43class Storer(object):
45 def __init__(self, abstract_set: AbstractSet, repok: Reporter = None, reperr: Reporter = None,
46 context_map: Dict[str, Any] = None, default_dir: str = "_", dir_split: int = 0,
47 n_file_item: int = 1, output_format: str = "json-ld", zip_output: bool = False, modified_entities: set = None) -> None:
48 # We only accept format strings that:
49 # 1. are supported by rdflib
50 # 2. correspond to an output format which is effectively either NT or NQ
51 # The only exception to this rule is the 'json-ld' format, which is the default value of 'output_format'.
52 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11',
53 'application/n-quads', 'nquads', 'json-ld'}
54 if output_format not in supported_formats:
55 raise ValueError(f"Given output_format '{self.output_format}' is not supported."
56 f" Available formats: {supported_formats}.")
57 else:
58 self.output_format: str = output_format
59 self.zip_output = zip_output
60 self.dir_split: int = dir_split
61 self.n_file_item: int = n_file_item
62 self.default_dir: str = default_dir if default_dir != "" else "_"
63 self.a_set: AbstractSet = abstract_set
64 self.modified_entities = modified_entities
66 if context_map is not None:
67 self.context_map: Dict[str, Any] = context_map
68 else:
69 self.context_map: Dict[str, Any] = {}
71 if self.output_format == "json-ld":
72 for context_url in self.context_map:
73 ctx_file_path: Any = self.context_map[context_url]
74 if type(ctx_file_path) == str and os.path.isfile(ctx_file_path):
75 # This expensive operation is done only when it's really needed
76 with open(ctx_file_path, 'rt', encoding='utf-8') as ctx_f:
77 self.context_map[context_url] = json.load(ctx_f)
79 if repok is None:
80 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ")
81 else:
82 self.repok: Reporter = repok
84 if reperr is None:
85 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ")
86 else:
87 self.reperr: Reporter = reperr
89 def store_graphs_in_file(self, file_path: str, context_path: str = None) -> None:
90 self.repok.new_article()
91 self.reperr.new_article()
92 self.repok.add_sentence("Store the graphs into a file: starting process")
94 cg: ConjunctiveGraph = ConjunctiveGraph()
95 for g in self.a_set.graphs():
96 cg.addN([item + (g.identifier,) for item in list(g)])
98 self._store_in_file(cg, file_path, context_path)
100 def _store_in_file(self, cur_g: ConjunctiveGraph, cur_file_path: str, context_path: str = None) -> None:
101 # Note: the following lines from here and until 'cur_json_ld' are a sort of hack for including all
102 # the triples of the input graph into the final stored file. Somehow, some of them are not written
103 # in such file otherwise - in particular the provenance ones.
104 new_g: ConjunctiveGraph = ConjunctiveGraph()
105 for s, p, o in cur_g.triples((None, None, None)):
106 g_iri: Optional[URIRef] = None
107 for g_context in cur_g.contexts((s, p, o)):
108 g_iri = g_context.identifier
109 break
110 new_g.addN([(s, p, o, g_iri)])
112 zip_file_path = cur_file_path.replace(os.path.splitext(cur_file_path)[1], ".zip")
114 if self.zip_output:
115 with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file:
116 self._write_graph(new_g, zip_file, cur_file_path, context_path)
117 else:
118 # Handle non-zipped output directly to a file
119 self._write_graph(new_g, None, cur_file_path, context_path)
121 self.repok.add_sentence(f"File '{cur_file_path}' added.")
123 def _write_graph(self, graph: ConjunctiveGraph, zip_file: ZipFile, cur_file_path, context_path):
124 if self.output_format == "json-ld":
125 # Serialize the graph in JSON-LD format
126 cur_json_ld = json.loads(graph.serialize(format="json-ld", context=self.context_map.get(context_path)))
127 if context_path is not None and context_path in self.context_map:
128 if isinstance(cur_json_ld, dict):
129 cur_json_ld["@context"] = context_path
130 else: # When cur_json_ld is a list
131 for item in cur_json_ld:
132 item["@context"] = context_path
134 # Determine how to write based on zip file presence
135 if zip_file is not None:
136 dumped_json = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8')
137 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=dumped_json)
138 else:
139 with open(cur_file_path, 'wt', encoding='utf-8') as f:
140 json.dump(cur_json_ld, f, ensure_ascii=False)
141 else:
142 # Handle other RDF formats
143 if zip_file is not None:
144 rdf_serialization = graph.serialize(destination=None, format=self.output_format, encoding="utf-8")
145 zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization)
146 else:
147 graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8")
149 def store_all(self, base_dir: str, base_iri: str, context_path: str = None, process_id: int|str = None) -> List[str]:
150 self.repok.new_article()
151 self.reperr.new_article()
153 self.repok.add_sentence("Starting the process")
155 relevant_paths: Dict[str, list] = dict()
156 for entity in self.a_set.res_to_entity.values():
157 is_relevant = True
158 if self.modified_entities is not None and URIRef(entity.res.split('/prov/se/')[0]) not in self.modified_entities:
159 is_relevant = False
160 if is_relevant:
161 cur_dir_path, cur_file_path = self._dir_and_file_paths(entity.res, base_dir, base_iri, process_id)
162 if not os.path.exists(cur_dir_path):
163 os.makedirs(cur_dir_path)
164 relevant_paths.setdefault(cur_file_path, list())
165 relevant_paths[cur_file_path].append(entity)
167 for relevant_path, entities_in_path in relevant_paths.items():
168 stored_g = None
169 # Here we try to obtain a reference to the currently stored graph
170 output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path
171 lock = FileLock(f"{output_filepath}.lock")
172 with lock:
173 if os.path.exists(output_filepath):
174 stored_g = Reader(context_map=self.context_map).load(output_filepath)
175 if stored_g is None:
176 stored_g = ConjunctiveGraph()
177 for entity_in_path in entities_in_path:
178 self.store(entity_in_path, stored_g, relevant_path, context_path, False)
179 self._store_in_file(stored_g, relevant_path, context_path)
181 return list(relevant_paths.keys())
183 def store(self, entity: AbstractEntity, destination_g: ConjunctiveGraph, cur_file_path: str, context_path: str = None, store_now: bool = True) -> ConjunctiveGraph:
184 self.repok.new_article()
185 self.reperr.new_article()
187 try:
188 if isinstance(entity, ProvEntity):
189 quads: List[Tuple] = []
190 graph_identifier: URIRef = entity.g.identifier
191 for triple in entity.g.triples((entity.res, None, None)):
192 quads.append((*triple, graph_identifier))
193 destination_g.addN(quads)
194 elif isinstance(entity, GraphEntity) or isinstance(entity, MetadataEntity):
195 if entity.to_be_deleted:
196 destination_g.remove((entity.res, None, None, None))
197 else:
198 if len(entity.preexisting_graph) > 0:
199 """
200 We're not in 'append mode', so we need to remove
201 the entity that we're going to overwrite.
202 """
203 destination_g.remove((entity.res, None, None, None))
204 """
205 Here we copy data from the entity into the stored graph.
206 If the entity was marked as to be deleted, then we're
207 done because we already removed all of its triples.
208 """
209 quads: List[Tuple] = []
210 graph_identifier: URIRef = entity.g.identifier
211 for triple in entity.g.triples((entity.res, None, None)):
212 quads.append((*triple, graph_identifier))
213 destination_g.addN(quads)
215 if store_now:
216 self._store_in_file(destination_g, cur_file_path, context_path)
218 return destination_g
219 except Exception as e:
220 self.reperr.add_sentence(f"[1] It was impossible to store the RDF statements in {cur_file_path}. {e}")
222 def upload_and_store(self, base_dir: str, triplestore_url: str, base_iri: str, context_path: str = None,
223 batch_size: int = 10) -> None:
224 stored_graph_path: List[str] = self.store_all(base_dir, base_iri, context_path)
226 # If some graphs were not stored properly, then no one will be uploaded to the triplestore
227 # Anyway, we should highlight those ones that could have been added in principle, by
228 # mentioning them with a ".notuploaded" marker
229 if None in stored_graph_path:
230 for file_path in stored_graph_path:
231 if file_path is not None:
232 # Create a marker for the file not uploaded in the triplestore
233 open(f'{file_path}.notuploaded', 'wt', encoding='utf-8').close()
234 self.reperr.add_sentence("[2] "
235 f"The statements contained in the JSON-LD file '{file_path}' "
236 "were not uploaded into the triplestore.")
237 else: # All the files have been stored
238 self.upload_all(triplestore_url, base_dir, batch_size)
240 def _dir_and_file_paths(self, res: URIRef, base_dir: str, base_iri: str, process_id: int|str = None) -> Tuple[str, str]:
241 is_json: bool = (self.output_format == "json-ld")
242 return find_paths(res, base_dir, base_iri, self.default_dir, self.dir_split, self.n_file_item, is_json=is_json, process_id=process_id)
244 @staticmethod
245 def _class_to_entity_type(entity: AbstractEntity) -> Optional[str]:
246 if isinstance(entity, GraphEntity):
247 return "graph"
248 elif isinstance(entity, ProvEntity):
249 return "prov"
250 elif isinstance(entity, MetadataEntity):
251 return "metadata"
252 else:
253 return None
255 def upload_all(self, triplestore_url: str, base_dir: str = None, batch_size: int = 10, save_queries: bool = False) -> bool:
256 self.repok.new_article()
257 self.reperr.new_article()
259 if batch_size <= 0:
260 batch_size = 10
262 query_string: str = ""
263 added_statements: int = 0
264 removed_statements: int = 0
265 skipped_queries: int = 0
266 result: bool = True
268 if save_queries:
269 to_be_uploaded_dir = os.path.join(base_dir, "to_be_uploaded")
270 os.makedirs(to_be_uploaded_dir, exist_ok=True)
272 for idx, entity in enumerate(self.a_set.res_to_entity.values()):
273 update_query, n_added, n_removed = get_update_query(entity, entity_type=self._class_to_entity_type(entity))
275 if update_query == "":
276 skipped_queries += 1
277 else:
278 index = idx - skipped_queries
279 if index == 0:
280 # First query
281 query_string = update_query
282 added_statements = n_added
283 removed_statements = n_removed
284 elif index % batch_size == 0:
285 # batch_size-multiple query
286 if save_queries:
287 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements)
288 else:
289 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
290 query_string = update_query
291 added_statements = n_added
292 removed_statements = n_removed
293 else:
294 # Accumulated query
295 query_string += " ; " + update_query
296 added_statements += n_added
297 removed_statements += n_removed
299 if query_string != "":
300 if save_queries:
301 self._save_query(query_string, to_be_uploaded_dir, added_statements, removed_statements)
302 else:
303 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
305 return result
307 def _save_query(self, query_string: str, directory: str, added_statements: int, removed_statements: int) -> None:
308 timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
309 file_name = f"{timestamp}_add{added_statements}_remove{removed_statements}.sparql"
310 file_path = os.path.join(directory, file_name)
311 with open(file_path, 'w', encoding='utf-8') as f:
312 f.write(query_string)
314 def upload(self, entity: AbstractEntity, triplestore_url: str, base_dir: str = None) -> bool:
315 self.repok.new_article()
316 self.reperr.new_article()
318 update_query, n_added, n_removed = get_update_query(entity, entity_type=self._class_to_entity_type(entity))
320 return self._query(update_query, triplestore_url, base_dir, n_added, n_removed)
322 def execute_query(self, query_string: str, triplestore_url: str) -> bool:
323 self.repok.new_article()
324 self.reperr.new_article()
326 return self._query(query_string, triplestore_url)
328 def _query(self, query_string: str, triplestore_url: str, base_dir: str = None,
329 added_statements: int = 0, removed_statements: int = 0) -> bool:
330 if query_string != "":
331 attempt = 0
332 max_attempts = 3
333 wait_time = 5 # Initial wait time in seconds
335 while attempt < max_attempts:
336 try:
337 sparql: SPARQLWrapper = SPARQLWrapper(triplestore_url)
338 sparql.setQuery(query_string)
339 sparql.setMethod('POST')
341 sparql.query()
343 self.repok.add_sentence(
344 f"Triplestore updated with {added_statements} added statements and "
345 f"with {removed_statements} removed statements.")
347 return True
349 except Exception as e:
350 attempt += 1
351 self.reperr.add_sentence("[3] "
352 f"Attempt {attempt} failed. Graph was not loaded into the "
353 f"triplestore due to communication problems: {e}")
354 if attempt < max_attempts:
355 self.reperr.add_sentence(f"Retrying in {wait_time} seconds...")
356 time.sleep(wait_time)
357 wait_time *= 2 # Double the wait time for the next attempt
359 if base_dir is not None and attempt == max_attempts:
360 self.reperr.add_sentence("[3] "
361 "Graph was not loaded into the "
362 f"triplestore due to communication problems: {e}")
363 tp_err_dir: str = base_dir + os.sep + "tp_err"
364 if not os.path.exists(tp_err_dir):
365 os.makedirs(tp_err_dir)
366 cur_file_err: str = tp_err_dir + os.sep + \
367 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt')
368 with open(cur_file_err, 'wt', encoding='utf-8') as f:
369 f.write(query_string)
371 return False