Coverage for heritrace / editor.py: 99%
158 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from dataclasses import dataclass
6from datetime import datetime, timezone
8from flask import current_app
9from rdflib import Literal, URIRef
10from rdflib_ocdm.counter_handler.counter_handler import CounterHandler
11from rdflib_ocdm.ocdm_graph import OCDMDataset, OCDMGraph
12from rdflib_ocdm.reader import Reader
13from rdflib_ocdm.storer import Storer
14from SPARQLWrapper import JSON
16from heritrace.sparql import SPARQLWrapperWithRetry, get_sparql_bindings
19@dataclass(frozen=True, slots=True)
20class EndpointConfig:
21 dataset: str
22 provenance: str
23 is_quadstore: bool = True
26class EditorError(Exception):
27 pass
30class Editor:
31 def __init__(
32 self,
33 endpoints: EndpointConfig,
34 counter_handler: CounterHandler,
35 resp_agent: URIRef,
36 source: URIRef | None = None,
37 c_time: datetime | None = None,
38 ) -> None:
39 self.dataset_endpoint = endpoints.dataset
40 self.provenance_endpoint = endpoints.provenance
41 self.counter_handler = counter_handler
42 self.resp_agent = resp_agent
43 self.source = source
44 self.c_time = self.to_posix_timestamp(c_time)
45 self.dataset_is_quadstore = endpoints.is_quadstore
46 self.g_set = (
47 OCDMDataset(self.counter_handler)
48 if self.dataset_is_quadstore
49 else OCDMGraph(self.counter_handler)
50 )
52 def create(
53 self,
54 subject: URIRef,
55 predicate: URIRef,
56 value: Literal | URIRef,
57 graph: URIRef | None = None,
58 ) -> None:
59 if self.dataset_is_quadstore and graph:
60 self.g_set.add( # type: ignore[arg-type]
61 (subject, predicate, value, graph), # type: ignore[arg-type]
62 resp_agent=self.resp_agent,
63 primary_source=self.source,
64 )
65 else:
66 self.g_set.add( # type: ignore[arg-type]
67 (subject, predicate, value),
68 resp_agent=self.resp_agent,
69 primary_source=self.source,
70 )
72 def update(
73 self,
74 subject: URIRef,
75 predicate: URIRef,
76 old_value: Literal | URIRef,
77 new_value: Literal | URIRef,
78 graph: URIRef | None = None,
79 ) -> None:
80 if self.dataset_is_quadstore and graph:
81 if (subject, predicate, old_value, graph) not in self.g_set: # type: ignore[operator]
82 msg = (
83 f"Triple ({subject}, {predicate},"
84 f" {old_value}, {graph}) does not exist"
85 )
86 raise EditorError(msg)
87 self.g_set.remove((subject, predicate, old_value, graph)) # type: ignore[arg-type]
88 self.g_set.add( # type: ignore[arg-type]
89 (subject, predicate, new_value, graph), # type: ignore[arg-type]
90 resp_agent=self.resp_agent,
91 primary_source=self.source,
92 )
93 else:
94 if (subject, predicate, old_value) not in self.g_set: # type: ignore[operator]
95 msg = f"Triple ({subject}, {predicate}, {old_value}) does not exist"
96 raise EditorError(msg)
97 self.g_set.remove((subject, predicate, old_value)) # type: ignore[arg-type]
98 self.g_set.add( # type: ignore[arg-type]
99 (subject, predicate, new_value),
100 resp_agent=self.resp_agent,
101 primary_source=self.source,
102 )
104 def _delete_full_entity(self, subject: URIRef) -> None:
105 if self.dataset_is_quadstore:
106 quads = list(self.g_set.quads((subject, None, None, None))) # type: ignore[arg-type]
107 if not quads:
108 msg = f"Entity {subject} does not exist"
109 raise EditorError(msg)
110 for quad in quads:
111 self.g_set.remove(quad) # type: ignore[arg-type]
113 object_quads = list(self.g_set.quads((None, None, subject, None))) # type: ignore[arg-type]
114 for quad in object_quads:
115 self.g_set.remove(quad) # type: ignore[arg-type]
116 else:
117 triples = list(self.g_set.triples((subject, None, None))) # type: ignore[arg-type]
118 if not triples:
119 msg = f"Entity {subject} does not exist"
120 raise EditorError(msg)
121 for triple in triples:
122 self.g_set.remove(triple) # type: ignore[arg-type]
124 object_triples = list(self.g_set.triples((None, None, subject))) # type: ignore[arg-type]
125 for triple in object_triples:
126 self.g_set.remove(triple) # type: ignore[arg-type]
127 self.g_set.mark_as_deleted(subject) # type: ignore[arg-type]
129 def _delete_specific_triple(
130 self,
131 subject: URIRef,
132 predicate: URIRef,
133 value: Literal | URIRef,
134 graph: URIRef | None,
135 ) -> None:
136 if self.dataset_is_quadstore and graph:
137 if (subject, predicate, value, graph) not in self.g_set: # type: ignore[operator]
138 msg = (
139 f"Triple ({subject}, {predicate}, {value}, {graph}) does not exist"
140 )
141 raise EditorError(msg)
142 self.g_set.remove((subject, predicate, value, graph)) # type: ignore[arg-type]
143 else:
144 if (subject, predicate, value) not in self.g_set: # type: ignore[operator]
145 msg = f"Triple ({subject}, {predicate}, {value}) does not exist"
146 raise EditorError(msg)
147 self.g_set.remove((subject, predicate, value)) # type: ignore[arg-type]
149 def _delete_all_for_predicate(
150 self,
151 subject: URIRef,
152 predicate: URIRef,
153 graph: URIRef | None,
154 ) -> None:
155 if self.dataset_is_quadstore and graph:
156 quads = list(self.g_set.quads((subject, predicate, None, graph))) # type: ignore[arg-type]
157 if not quads:
158 msg = (
159 f"No triples found with subject"
160 f" {subject} and predicate"
161 f" {predicate} in graph {graph}"
162 )
163 raise EditorError(msg)
164 for quad in quads:
165 self.g_set.remove(quad) # type: ignore[arg-type]
166 else:
167 triples = list(self.g_set.triples((subject, predicate, None))) # type: ignore[arg-type]
168 if not triples:
169 msg = (
170 f"No triples found with subject {subject} and predicate {predicate}"
171 )
172 raise EditorError(msg)
173 for triple in triples:
174 self.g_set.remove(triple) # type: ignore[arg-type]
176 def delete(
177 self,
178 subject: URIRef,
179 predicate: URIRef | None = None,
180 value: Literal | URIRef | None = None,
181 graph: URIRef | None = None,
182 ) -> None:
183 if predicate is None:
184 self._delete_full_entity(subject)
185 elif value:
186 self._delete_specific_triple(subject, predicate, value, graph)
187 else:
188 self._delete_all_for_predicate(subject, predicate, graph)
190 from heritrace.utils.sparql_utils import get_triples_from_graph # noqa: PLC0415
192 if len(list(get_triples_from_graph(self.g_set, (subject, None, None)))) == 0:
193 self.g_set.mark_as_deleted(subject) # type: ignore[arg-type]
195 def import_entity(self, subject: URIRef) -> None:
196 Reader.import_entities_from_triplestore(
197 self.g_set,
198 self.dataset_endpoint,
199 [subject], # type: ignore[arg-type]
200 )
202 def merge(self, keep_entity_uri: URIRef, delete_entity_uri: URIRef) -> None:
203 if keep_entity_uri == delete_entity_uri:
204 msg = "Cannot merge an entity with itself."
205 raise ValueError(msg)
207 merge_sparql = SPARQLWrapperWithRetry(self.dataset_endpoint)
208 entities_to_import: set[URIRef] = {keep_entity_uri, delete_entity_uri}
209 incoming_triples_to_update: list[tuple[URIRef, URIRef]] = []
210 outgoing_triples_to_move: list[tuple[URIRef, Literal | URIRef]] = []
212 query_incoming = (
213 "SELECT DISTINCT ?s ?p WHERE {"
214 f" ?s ?p <{delete_entity_uri}> ."
215 f" FILTER (?s != <{keep_entity_uri}>) }}"
216 )
217 merge_sparql.setQuery(query_incoming)
218 merge_sparql.setReturnFormat(JSON)
219 for binding in get_sparql_bindings(merge_sparql.query().convert()):
220 s_uri = URIRef(binding["s"]["value"])
221 p_uri = URIRef(binding["p"]["value"])
222 incoming_triples_to_update.append((s_uri, p_uri))
223 entities_to_import.add(s_uri)
225 query_outgoing = f"""
226 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
227 SELECT DISTINCT ?p ?o WHERE {{
228 <{delete_entity_uri}> ?p ?o .
229 FILTER (?p != rdf:type)
230 }}
231 """
232 merge_sparql.setQuery(query_outgoing)
233 merge_sparql.setReturnFormat(JSON)
234 for binding in get_sparql_bindings(merge_sparql.query().convert()):
235 p_uri = URIRef(binding["p"]["value"])
236 o_node = binding["o"]
237 o_val: Literal | URIRef | None = None
238 if o_node["type"] == "uri":
239 o_val = URIRef(o_node["value"])
240 entities_to_import.add(o_val)
241 elif o_node["type"] in {"literal", "typed-literal"}:
242 o_val = Literal(
243 o_node["value"],
244 lang=o_node.get("xml:lang"),
245 datatype=URIRef(o_node["datatype"])
246 if o_node.get("datatype")
247 else None,
248 )
249 else:
250 current_app.logger.warning(
251 "Skipping non-URI/Literal object type '%s' from %s via %s",
252 o_node["type"],
253 delete_entity_uri,
254 p_uri,
255 )
256 continue
257 if o_val:
258 outgoing_triples_to_move.append((p_uri, o_val))
260 if entities_to_import:
261 Reader.import_entities_from_triplestore(
262 self.g_set,
263 self.dataset_endpoint,
264 list(entities_to_import), # type: ignore[arg-type]
265 )
266 self.g_set.preexisting_finished(self.resp_agent, self.source, self.c_time) # type: ignore[arg-type]
268 self.g_set.merge(keep_entity_uri, delete_entity_uri) # type: ignore[arg-type]
270 self.save()
272 def preexisting_finished(self) -> None:
273 self.g_set.preexisting_finished(self.resp_agent, self.source, self.c_time) # type: ignore[arg-type]
275 def save(self) -> None:
276 self.g_set.generate_provenance() # type: ignore[arg-type]
277 dataset_storer = Storer(self.g_set) # type: ignore[arg-type]
278 prov_storer = Storer(self.g_set.provenance) # type: ignore[attr-defined]
279 dataset_storer.upload_all(self.dataset_endpoint) # type: ignore[arg-type]
280 prov_storer.upload_all(self.provenance_endpoint) # type: ignore[arg-type]
281 self.g_set.commit_changes() # type: ignore[arg-type]
283 def to_posix_timestamp(self, value: str | datetime | None) -> float | None:
284 if value is None:
285 return None
286 if isinstance(value, datetime):
287 return value.timestamp()
288 if isinstance(value, str):
289 dt = datetime.fromisoformat(value)
290 if dt.tzinfo is None:
291 dt = dt.replace(tzinfo=timezone.utc)
292 return dt.timestamp()
293 return None
295 def set_primary_source(self, source: URIRef) -> None:
296 self.source = source