Coverage for rdflib_ocdm / storer.py: 96%
77 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 12:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 12:35 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2023-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7from __future__ import annotations
9import os
10from datetime import datetime
11from typing import TYPE_CHECKING
13from oc_ocdm.support.reporter import Reporter
14from SPARQLWrapper import SPARQLWrapper
16from rdflib_ocdm.ocdm_graph import (OCDMDataset, OCDMGraph,
17 OCDMGraphCommons)
18from rdflib_ocdm.query_utils import get_update_query
19from rdflib_ocdm.retry_utils import execute_with_retry
21if TYPE_CHECKING:
22 from typing import Set
24 from rdflib import Graph
27class Storer(object):
29 def __init__(self, abstract_set: OCDMGraphCommons|Graph, repok: Reporter = None, reperr: Reporter = None, output_format: str = "json-ld", zip_output: bool = False,) -> None:
30 self.a_set = abstract_set
31 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11',
32 'application/n-quads', 'nquads', 'json-ld'}
33 if output_format not in supported_formats:
34 raise ValueError(f"Given output_format '{output_format}' is not supported."
35 f" Available formats: {supported_formats}.")
36 else:
37 self.output_format: str = output_format
38 self.zip_output = zip_output
39 if repok is None:
40 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ")
41 else:
42 self.repok: Reporter = repok
44 if reperr is None:
45 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ")
46 else:
47 self.reperr: Reporter = reperr
49 def _query(self, query_string: str, triplestore_url: str, base_dir: str = None,
50 added_statements: int = 0, removed_statements: int = 0, max_retries: int = 5) -> bool:
51 if query_string != "": 51 ↛ 84line 51 didn't jump to line 84 because the condition on line 51 was always true
52 try:
53 # Use the retry utility function with custom error handling
54 def execute_query():
55 sparql: SPARQLWrapper = SPARQLWrapper(triplestore_url)
56 sparql.setQuery(query_string)
57 sparql.setMethod('POST')
58 sparql.query()
59 return True
61 execute_with_retry(
62 execute_query,
63 max_retries=max_retries,
64 reporter=self.repok
65 )
67 self.repok.add_sentence(
68 f"Triplestore updated with {added_statements} added statements and "
69 f"with {removed_statements} removed statements.")
71 return True
72 except ValueError as e:
73 # Handle the case when all retries failed
74 self.reperr.add_sentence(f"[3] Graph was not loaded into the triplestore due to communication problems: {e}")
75 if base_dir is not None:
76 tp_err_dir: str = base_dir + os.sep + "tp_err"
77 if not os.path.exists(tp_err_dir): 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true
78 os.makedirs(tp_err_dir)
79 cur_file_err: str = tp_err_dir + os.sep + \
80 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt')
81 with open(cur_file_err, 'wt', encoding='utf-8') as f:
82 f.write(query_string)
83 return False
84 return False
86 def upload_all(self, triplestore_url: str, base_dir: str = None, batch_size: int = 10) -> bool:
87 self.repok.new_article()
88 self.reperr.new_article()
89 if batch_size <= 0:
90 batch_size = 10
91 query_string: str = ""
92 added_statements: int = 0
93 removed_statements: int = 0
94 skipped_queries: int = 0
95 result: bool = True
96 entity_type = 'graph' if isinstance(self.a_set, OCDMGraph) or isinstance(self.a_set, OCDMDataset) else 'prov'
97 for idx, entity in enumerate(list(self.a_set.all_entities)):
98 update_query, n_added, n_removed = get_update_query(self.a_set, entity, entity_type)
99 if update_query == "":
100 skipped_queries += 1
101 else:
102 index = idx - skipped_queries
103 if index == 0:
104 # First query
105 query_string = update_query
106 added_statements = n_added
107 removed_statements = n_removed
108 elif index % batch_size == 0:
109 # batch_size-multiple query
110 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
111 query_string = update_query
112 added_statements = n_added
113 removed_statements = n_removed
114 else:
115 # Accumulated query
116 query_string += " ; " + update_query
117 added_statements += n_added
118 removed_statements += n_removed
119 if query_string != "": 119 ↛ 121line 119 didn't jump to line 121 because the condition on line 119 was always true
120 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
121 return result