Coverage for rdflib_ocdm/storer.py: 93%
80 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-11-01 22:02 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-11-01 22:02 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright 2023 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from __future__ import annotations
19import os
20from datetime import datetime
21from typing import TYPE_CHECKING
23from oc_ocdm.support.reporter import Reporter
24from SPARQLWrapper import SPARQLWrapper
26from rdflib_ocdm.ocdm_graph import (OCDMDataset, OCDMGraph,
27 OCDMGraphCommons)
28from rdflib_ocdm.query_utils import get_update_query
29from rdflib_ocdm.retry_utils import execute_with_retry
31if TYPE_CHECKING: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true
32 from typing import Set
34 from rdflib import Graph
37class Storer(object):
39 def __init__(self, abstract_set: OCDMGraphCommons|Graph, repok: Reporter = None, reperr: Reporter = None, output_format: str = "json-ld", zip_output: bool = False,) -> None:
40 self.a_set = abstract_set
41 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11',
42 'application/n-quads', 'nquads', 'json-ld'}
43 if output_format not in supported_formats:
44 raise ValueError(f"Given output_format '{output_format}' is not supported."
45 f" Available formats: {supported_formats}.")
46 else:
47 self.output_format: str = output_format
48 self.zip_output = zip_output
49 if repok is None:
50 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ")
51 else:
52 self.repok: Reporter = repok
54 if reperr is None:
55 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ")
56 else:
57 self.reperr: Reporter = reperr
59 def _query(self, query_string: str, triplestore_url: str, base_dir: str = None,
60 added_statements: int = 0, removed_statements: int = 0, max_retries: int = 5) -> bool:
61 if query_string != "": 61 ↛ 94line 61 didn't jump to line 94 because the condition on line 61 was always true
62 try:
63 # Use the retry utility function with custom error handling
64 def execute_query():
65 sparql: SPARQLWrapper = SPARQLWrapper(triplestore_url)
66 sparql.setQuery(query_string)
67 sparql.setMethod('POST')
68 sparql.query()
69 return True
71 execute_with_retry(
72 execute_query,
73 max_retries=max_retries,
74 reporter=self.repok
75 )
77 self.repok.add_sentence(
78 f"Triplestore updated with {added_statements} added statements and "
79 f"with {removed_statements} removed statements.")
81 return True
82 except ValueError as e:
83 # Handle the case when all retries failed
84 self.reperr.add_sentence(f"[3] Graph was not loaded into the triplestore due to communication problems: {e}")
85 if base_dir is not None:
86 tp_err_dir: str = base_dir + os.sep + "tp_err"
87 if not os.path.exists(tp_err_dir): 87 ↛ 89line 87 didn't jump to line 89 because the condition on line 87 was always true
88 os.makedirs(tp_err_dir)
89 cur_file_err: str = tp_err_dir + os.sep + \
90 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt')
91 with open(cur_file_err, 'wt', encoding='utf-8') as f:
92 f.write(query_string)
93 return False
94 return False
96 def upload_all(self, triplestore_url: str, base_dir: str = None, batch_size: int = 10) -> bool:
97 self.repok.new_article()
98 self.reperr.new_article()
99 if batch_size <= 0:
100 batch_size = 10
101 query_string: str = ""
102 added_statements: int = 0
103 removed_statements: int = 0
104 skipped_queries: int = 0
105 result: bool = True
106 entity_type = 'graph' if isinstance(self.a_set, OCDMGraph) or isinstance(self.a_set, OCDMDataset) else 'prov'
107 for idx, entity in enumerate(list(self.a_set.all_entities)):
108 update_query, n_added, n_removed = get_update_query(self.a_set, entity, entity_type)
109 if update_query == "":
110 skipped_queries += 1
111 else:
112 index = idx - skipped_queries
113 if index == 0:
114 # First query
115 query_string = update_query
116 added_statements = n_added
117 removed_statements = n_removed
118 elif index % batch_size == 0:
119 # batch_size-multiple query
120 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
121 query_string = update_query
122 added_statements = n_added
123 removed_statements = n_removed
124 else:
125 # Accumulated query
126 query_string += " ; " + update_query
127 added_statements += n_added
128 removed_statements += n_removed
129 if query_string != "": 129 ↛ 131line 129 didn't jump to line 131 because the condition on line 129 was always true
130 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements)
131 return result