Coverage for heritrace/editor.py: 100%
157 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
1from datetime import datetime
2import argparse
3import concurrent.futures
4import csv
5import os
6import traceback
7from typing import Dict, List, Set
9from rdflib import Graph, Literal, URIRef
10from rdflib_ocdm.counter_handler.counter_handler import CounterHandler
11from rdflib_ocdm.ocdm_graph import OCDMConjunctiveGraph, OCDMGraph
12from rdflib_ocdm.reader import Reader
13from rdflib_ocdm.storer import Storer
14from SPARQLWrapper import SPARQLWrapper, JSON
15from tqdm import tqdm
18class Editor:
19 def __init__(
20 self,
21 dataset_endpoint: str,
22 provenance_endpoint: str,
23 counter_handler: CounterHandler,
24 resp_agent: URIRef,
25 source: URIRef = None,
26 c_time: datetime | None = None,
27 dataset_is_quadstore: bool = True,
28 ):
29 self.dataset_endpoint = dataset_endpoint
30 self.provenance_endpoint = provenance_endpoint
31 self.counter_handler = counter_handler
32 self.resp_agent = resp_agent
33 self.source = source
34 self.c_time = self.to_posix_timestamp(c_time)
35 self.dataset_is_quadstore = dataset_is_quadstore
36 self.g_set = (
37 OCDMConjunctiveGraph(self.counter_handler)
38 if self.dataset_is_quadstore
39 else OCDMGraph(self.counter_handler)
40 )
42 def _normalize_params(self, subject, predicate=None, graph=None) -> tuple[URIRef, URIRef | None, URIRef | Graph | str | None]:
43 """Normalizza i parametri comuni per le operazioni sui grafi."""
44 # Normalizza il soggetto
45 if not isinstance(subject, URIRef):
46 subject = URIRef(subject)
48 # Normalizza il predicato se fornito
49 if predicate is not None and not isinstance(predicate, URIRef):
50 predicate = URIRef(predicate)
52 # Normalizza il grafo se fornito
53 if graph is not None:
54 if isinstance(graph, Graph):
55 graph = graph.identifier
56 elif isinstance(graph, str):
57 graph = URIRef(graph)
59 return subject, predicate, graph
61 def create(
62 self,
63 subject: URIRef,
64 predicate: URIRef,
65 value: Literal | URIRef,
66 graph: URIRef | Graph | str = None,
67 ) -> None:
68 # Normalizza i parametri
69 subject, predicate, graph = self._normalize_params(subject, predicate, graph)
71 if self.dataset_is_quadstore and graph:
72 self.g_set.add(
73 (subject, predicate, value, graph),
74 resp_agent=self.resp_agent,
75 primary_source=self.source,
76 )
77 else:
78 self.g_set.add(
79 (subject, predicate, value),
80 resp_agent=self.resp_agent,
81 primary_source=self.source,
82 )
84 def update(
85 self,
86 subject: URIRef,
87 predicate: URIRef,
88 old_value: Literal | URIRef,
89 new_value: Literal | URIRef,
90 graph: URIRef | Graph | str = None,
91 ) -> None:
92 # Normalizza i parametri
93 subject, predicate, graph = self._normalize_params(subject, predicate, graph)
95 # Check if the triple exists before updating
96 if self.dataset_is_quadstore and graph:
97 if not (subject, predicate, old_value, graph) in self.g_set:
98 raise Exception(
99 f"Triple ({subject}, {predicate}, {old_value}, {graph}) does not exist"
100 )
101 self.g_set.remove((subject, predicate, old_value, graph))
102 self.g_set.add(
103 (subject, predicate, new_value, graph),
104 resp_agent=self.resp_agent,
105 primary_source=self.source,
106 )
107 else:
108 if not (subject, predicate, old_value) in self.g_set:
109 raise Exception(
110 f"Triple ({subject}, {predicate}, {old_value}) does not exist"
111 )
112 self.g_set.remove((subject, predicate, old_value))
113 self.g_set.add(
114 (subject, predicate, new_value),
115 resp_agent=self.resp_agent,
116 primary_source=self.source,
117 )
119 def delete(
120 self,
121 subject: URIRef,
122 predicate: URIRef = None,
123 value=None,
124 graph: URIRef | Graph | str = None,
125 ) -> None:
126 # Normalizza i parametri
127 subject, predicate, graph = self._normalize_params(subject, predicate, graph)
129 if predicate is None:
130 # Delete the entire entity
131 # Check if the entity exists
132 if self.dataset_is_quadstore:
133 quads = list(self.g_set.quads((subject, None, None, None)))
134 if not quads:
135 raise Exception(f"Entity {subject} does not exist")
136 for quad in quads:
137 self.g_set.remove(quad)
139 # Also remove any triples where this entity is the object
140 object_quads = list(self.g_set.quads((None, None, subject, None)))
141 for quad in object_quads:
142 self.g_set.remove(quad)
143 else:
144 triples = list(self.g_set.triples((subject, None, None)))
145 if not triples:
146 raise Exception(f"Entity {subject} does not exist")
147 for triple in triples:
148 self.g_set.remove(triple)
150 # Also remove any triples where this entity is the object
151 object_triples = list(self.g_set.triples((None, None, subject)))
152 for triple in object_triples:
153 self.g_set.remove(triple)
154 self.g_set.mark_as_deleted(subject)
155 else:
156 if value:
157 # Check if the specific triple/quad exists before removing it
158 if self.dataset_is_quadstore and graph:
159 if not (subject, predicate, value, graph) in self.g_set:
160 raise Exception(
161 f"Triple ({subject}, {predicate}, {value}, {graph}) does not exist"
162 )
163 self.g_set.remove((subject, predicate, value, graph))
164 else:
165 if not (subject, predicate, value) in self.g_set:
166 raise Exception(
167 f"Triple ({subject}, {predicate}, {value}) does not exist"
168 )
169 self.g_set.remove((subject, predicate, value))
170 else:
171 # Check if any triples with the given subject and predicate exist
172 if self.dataset_is_quadstore and graph:
173 quads = list(self.g_set.quads((subject, predicate, None, graph)))
174 if not quads:
175 raise Exception(
176 f"No triples found with subject {subject} and predicate {predicate} in graph {graph}"
177 )
178 for quad in quads:
179 self.g_set.remove(quad)
180 else:
181 triples = list(self.g_set.triples((subject, predicate, None)))
182 if not triples:
183 raise Exception(
184 f"No triples found with subject {subject} and predicate {predicate}"
185 )
186 for triple in triples:
187 self.g_set.remove(triple)
189 # Check if the entity is now empty and mark it as deleted if so
190 if len(list(self.g_set.triples((subject, None, None)))) == 0:
191 self.g_set.mark_as_deleted(subject)
193 def import_entity(self, subject):
194 Reader.import_entities_from_triplestore(
195 self.g_set, self.dataset_endpoint, [subject]
196 )
198 def merge(self, keep_entity_uri: str, delete_entity_uri: str) -> None:
199 """
200 Merges one entity into another within the dataset.
202 The delete_entity_uri will be removed, and its properties and
203 incoming references will be transferred to keep_entity_uri.
204 All operations are performed within the local graph set managed by
205 this Editor instance and then saved, ensuring provenance capture.
207 Args:
208 keep_entity_uri: The URI of the entity to keep.
209 delete_entity_uri: The URI of the entity to delete and merge from.
211 Raises:
212 ValueError: If keep_entity_uri and delete_entity_uri are the same.
213 Exception: If errors occur during SPARQL queries or graph operations.
214 """
215 keep_uri, _, _ = self._normalize_params(keep_entity_uri)
216 delete_uri, _, _ = self._normalize_params(delete_entity_uri)
218 if keep_uri == delete_uri:
219 raise ValueError("Cannot merge an entity with itself.")
221 sparql = SPARQLWrapper(self.dataset_endpoint)
222 entities_to_import: Set[URIRef] = {keep_uri, delete_uri}
223 incoming_triples_to_update: List[tuple[URIRef, URIRef]] = []
224 outgoing_triples_to_move: List[tuple[URIRef, Literal | URIRef]] = []
226 try:
227 # 1. Find incoming references to delete_uri
228 # We fetch subjects and predicates pointing to the entity to be deleted.
229 query_incoming = f"SELECT DISTINCT ?s ?p WHERE {{ ?s ?p <{delete_uri}> . FILTER (?s != <{keep_uri}>) }}"
230 sparql.setQuery(query_incoming)
231 sparql.setReturnFormat(JSON)
232 results_incoming = sparql.query().convert()
233 for result in results_incoming["results"]["bindings"]:
234 s_uri = URIRef(result["s"]["value"])
235 p_uri = URIRef(result["p"]["value"])
236 incoming_triples_to_update.append((s_uri, p_uri))
237 entities_to_import.add(s_uri) # Ensure referencing entities are loaded
239 # 2. Find outgoing properties from delete_uri (excluding rdf:type)
240 # We fetch predicates and objects of the entity to be deleted.
241 query_outgoing = f"""
242 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
243 SELECT DISTINCT ?p ?o WHERE {{
244 <{delete_uri}> ?p ?o .
245 FILTER (?p != rdf:type)
246 }}
247 """
248 sparql.setQuery(query_outgoing)
249 sparql.setReturnFormat(JSON)
250 results_outgoing = sparql.query().convert()
251 for result in results_outgoing["results"]["bindings"]:
252 p_uri = URIRef(result["p"]["value"])
253 o_node = result["o"]
254 o_val: Literal | URIRef | None = None
255 if o_node["type"] == "uri":
256 o_val = URIRef(o_node["value"])
257 entities_to_import.add(o_val) # Ensure referenced entities are loaded
258 elif o_node["type"] in {"literal", "typed-literal"}:
259 o_val = Literal(o_node["value"], lang=o_node.get("xml:lang"), datatype=URIRef(o_node["datatype"]) if o_node.get("datatype") else None)
260 else: # bnode? Skip for now or handle if necessary
261 print(f"Warning: Skipping non-URI/Literal object type '{o_node['type']}' from {delete_uri} via {p_uri}")
262 continue
263 if o_val:
264 outgoing_triples_to_move.append((p_uri, o_val))
266 # 3. Import all involved entities into the local graph set
267 # This brings the current state of these entities from the triplestore
268 # into the Editor's context for modification.
269 if entities_to_import:
270 Reader.import_entities_from_triplestore(
271 self.g_set, self.dataset_endpoint, list(entities_to_import)
272 )
273 # Mark the start of modifications if using preexisting_finished pattern
274 self.g_set.preexisting_finished(self.resp_agent, self.source, self.c_time)
277 # 4. Perform the merge using the built-in function
278 # This function handles moving triples and updating the internal
279 # merge_index and entity_index for provenance generation.
280 self.g_set.merge(keep_uri, delete_uri)
282 # 5. Save changes and provenance
283 # This uploads the modified local graph and the generated provenance graph.
284 self.save()
286 except Exception as e:
287 print(f"Error during merge operation for {keep_uri} and {delete_uri}: {e}")
288 print(traceback.format_exc())
289 # Avoid committing partial changes by not calling save()
290 raise # Re-raise the exception to signal failure
292 def preexisting_finished(self):
293 self.g_set.preexisting_finished(self.resp_agent, self.source, self.c_time)
295 def save(self):
296 self.g_set.generate_provenance()
297 dataset_storer = Storer(self.g_set)
298 prov_storer = Storer(self.g_set.provenance)
299 dataset_storer.upload_all(self.dataset_endpoint)
300 prov_storer.upload_all(self.provenance_endpoint)
301 self.g_set.commit_changes()
303 def to_posix_timestamp(self, value: str | datetime | None) -> float | None:
304 if value is None:
305 return None
306 elif isinstance(value, datetime):
307 return value.timestamp()
308 elif isinstance(value, str):
309 dt = datetime.fromisoformat(value)
310 return dt.timestamp()