Coverage for heritrace / scripts / reset_provenance.py: 98%
126 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import importlib.util
7import logging
8import sys
9import types
10from datetime import datetime, timezone
11from urllib.parse import urlparse
13from rdflib import URIRef
14from rdflib_ocdm.counter_handler.counter_handler import CounterHandler
15from SPARQLWrapper import JSON
16from SPARQLWrapper.SPARQLExceptions import SPARQLWrapperException
18from heritrace.sparql import SPARQLWrapperWithRetry, get_sparql_bindings
19from heritrace.utils.converters import convert_to_datetime
21logger = logging.getLogger(__name__)
24class ProvenanceResetter:
25 """
26 A class to reset the provenance of a specific entity by deleting all snapshots
27 after snapshot 1 and resetting the provenance counters.
28 """
30 def __init__(
31 self,
32 provenance_endpoint: str,
33 counter_handler: CounterHandler,
34 ) -> None:
35 """
36 Initialize the ProvenanceResetter.
38 Args:
39 provenance_endpoint: The SPARQL endpoint for the provenance database
40 counter_handler: An instance of a CounterHandler to manage provenance
41 counters
42 """
43 self.provenance_endpoint = provenance_endpoint
44 self.provenance_sparql = SPARQLWrapperWithRetry(provenance_endpoint)
45 self.provenance_sparql.setReturnFormat(JSON)
46 self.counter_handler = counter_handler
47 self.logger = logging.getLogger(__name__)
49 def reset_entity_provenance(self, entity_uri: URIRef) -> bool:
51 # Step 1: Find all snapshots for the entity
52 snapshots = self.get_entity_snapshots(entity_uri)
53 if not snapshots:
54 self.logger.warning("No snapshots found for entity %s", entity_uri)
55 return False
57 # Sort snapshots by generation time, converting strings to datetime objects
58 sorted_snapshots = sorted(
59 snapshots,
60 key=lambda x: (
61 convert_to_datetime(x["generation_time"])
62 or datetime.min.replace(tzinfo=timezone.utc)
63 ),
64 )
66 # Keep only the first snapshot
67 first_snapshot = sorted_snapshots[0]
68 snapshots_to_delete = sorted_snapshots[1:]
70 if not snapshots_to_delete:
71 self.logger.info(
72 "Entity %s has only one snapshot, nothing to reset", entity_uri
73 )
74 # Still remove invalidatedAtTime from the first snapshot
75 self.remove_invalidated_time(first_snapshot)
76 return True
78 # Step 2: Delete all snapshots after the first one
79 success = self.delete_snapshots(snapshots_to_delete)
80 if not success:
81 return False
83 # Step 3: Reset the provenance counter for this entity
84 self.reset_provenance_counter(entity_uri)
86 # Step 4: Remove invalidatedAtTime from the first snapshot
87 self.remove_invalidated_time(first_snapshot)
89 self.logger.info("Successfully reset provenance for entity %s", entity_uri)
90 return True
92 def get_entity_snapshots(self, entity_uri: URIRef) -> list:
93 """
94 Get all snapshots for a specific entity.
96 Args:
97 entity_uri: The URI of the entity
99 Returns:
100 list: A list of dictionaries containing snapshot information
101 """
102 query = f"""
103 PREFIX prov: <http://www.w3.org/ns/prov#>
105 SELECT ?snapshot ?generation_time
106 WHERE {{
107 GRAPH ?g {{
108 ?snapshot prov:specializationOf <{entity_uri}> ;
109 prov:generatedAtTime ?generation_time .
110 }}
111 }}
112 ORDER BY ?generation_time
113 """
115 self.provenance_sparql.setQuery(query)
116 bindings = get_sparql_bindings(self.provenance_sparql.queryAndConvert())
118 return [
119 {
120 "uri": binding["snapshot"]["value"],
121 "generation_time": binding["generation_time"]["value"],
122 }
123 for binding in bindings
124 ]
126 def delete_snapshots(self, snapshots: list) -> bool:
127 """
128 Delete a list of snapshots from the provenance database.
130 Args:
131 snapshots: A list of snapshot dictionaries to delete
133 Returns:
134 bool: True if the operation was successful, False otherwise
135 """
136 if not snapshots:
137 return True
139 # Virtuoso has limitations with DELETE WHERE queries
140 # We need to delete each snapshot individually
141 success = True
142 for snapshot in snapshots:
143 snapshot_uri = snapshot["uri"]
145 # Construct the graph name based on the snapshot URI
146 # The graph name follows the pattern: snapshot_uri/prov/
147 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/"
149 # Delete all triples where the snapshot is the subject
150 query = f"""
151 PREFIX prov: <http://www.w3.org/ns/prov#>
153 DELETE {{
154 GRAPH <{graph_uri}> {{
155 <{snapshot_uri}> ?p ?o .
156 }}
157 }}
158 WHERE {{
159 GRAPH <{graph_uri}> {{
160 <{snapshot_uri}> ?p ?o .
161 }}
162 }}
163 """
165 try:
166 self.provenance_sparql.setQuery(query)
167 self.provenance_sparql.method = "POST"
168 self.provenance_sparql.query()
170 # Also delete triples where the snapshot is the object
171 query = f"""
172 PREFIX prov: <http://www.w3.org/ns/prov#>
174 DELETE {{
175 GRAPH <{graph_uri}> {{
176 ?s ?p <{snapshot_uri}> .
177 }}
178 }}
179 WHERE {{
180 GRAPH <{graph_uri}> {{
181 ?s ?p <{snapshot_uri}> .
182 }}
183 }}
184 """
186 self.provenance_sparql.setQuery(query)
187 self.provenance_sparql.query()
189 self.logger.debug(
190 "Successfully deleted snapshot: %s from graph: %s",
191 snapshot_uri,
192 graph_uri,
193 )
194 except SPARQLWrapperException:
195 self.logger.exception("Error deleting snapshot %s", snapshot_uri)
196 success = False
198 return success
200 def reset_provenance_counter(self, entity_uri: URIRef) -> None:
201 """
202 Reset the provenance counter for a specific entity to 1.
204 Args:
205 entity_uri: The URI of the entity
206 """
207 # Extract the entity name from the URI
208 parsed_uri = urlparse(str(entity_uri))
209 entity_name = parsed_uri.path.split("/")[-1]
211 # Set the counter to 1 (for the first snapshot)
212 self.counter_handler.set_counter(1, entity_name)
213 self.logger.info("Reset provenance counter for entity %s to 1", entity_uri)
215 def remove_invalidated_time(self, snapshot: dict) -> bool:
216 """
217 Remove the invalidatedAtTime property from a snapshot.
219 Args:
220 snapshot: A dictionary containing snapshot information
222 Returns:
223 bool: True if the operation was successful, False otherwise
224 """
225 snapshot_uri = snapshot["uri"]
227 # Construct the graph name based on the snapshot URI
228 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/"
230 # Delete the invalidatedAtTime property
231 query = f"""
232 PREFIX prov: <http://www.w3.org/ns/prov#>
234 DELETE {{
235 GRAPH <{graph_uri}> {{
236 <{snapshot_uri}> prov:invalidatedAtTime ?time .
237 }}
238 }}
239 WHERE {{
240 GRAPH <{graph_uri}> {{
241 <{snapshot_uri}> prov:invalidatedAtTime ?time .
242 }}
243 }}
244 """
246 try:
247 self.provenance_sparql.setQuery(query)
248 self.provenance_sparql.method = "POST"
249 self.provenance_sparql.query()
250 self.logger.info(
251 "Successfully removed invalidatedAtTime from snapshot: %s",
252 snapshot_uri,
253 )
254 except SPARQLWrapperException:
255 self.logger.exception(
256 "Error removing invalidatedAtTime from snapshot %s", snapshot_uri
257 )
258 return False
259 else:
260 return True
263def reset_entity_provenance(
264 entity_uri: URIRef,
265 provenance_endpoint: str,
266 counter_handler: CounterHandler,
267) -> bool:
268 resetter = ProvenanceResetter(
269 provenance_endpoint=provenance_endpoint,
270 counter_handler=counter_handler,
271 )
273 return resetter.reset_entity_provenance(entity_uri)
276def load_config(config_path: str) -> types.ModuleType:
277 """
278 Load configuration from a Python file.
280 Args:
281 config_path: Path to the configuration file
283 Returns:
284 module: The loaded configuration module
285 """
286 try:
287 spec = importlib.util.spec_from_file_location("config", config_path)
288 if spec is None or spec.loader is None:
289 logger.error("Failed to create module spec from %s", config_path)
290 sys.exit(1)
291 config = importlib.util.module_from_spec(spec)
292 spec.loader.exec_module(config)
293 except SystemExit:
294 raise
295 except (FileNotFoundError, ImportError, AttributeError):
296 logger.exception("Error loading configuration file: %s", config_path)
297 sys.exit(1)
298 else:
299 return config
302def main() -> int:
303 parser = argparse.ArgumentParser(
304 description="Reset the provenance of a specific entity"
305 )
306 parser.add_argument("entity_uri", help="URI of the entity to reset")
307 parser.add_argument(
308 "--config", "-c", required=True, help="Path to the configuration file"
309 )
310 parser.add_argument(
311 "--verbose", "-v", action="store_true", help="Enable verbose logging"
312 )
314 args = parser.parse_args()
316 # Setup logging
317 log_level = logging.DEBUG if args.verbose else logging.INFO
318 logging.basicConfig(
319 level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
320 )
322 # Load configuration
323 config = load_config(args.config)
325 # Check if Config class exists
326 if not hasattr(config, "Config"):
327 logger.error("Configuration file must define a Config class")
328 return 1
330 # Get required configuration from Config class
331 if not hasattr(config.Config, "PROVENANCE_DB_URL"):
332 logger.error("Config class must define PROVENANCE_DB_URL")
333 return 1
335 provenance_endpoint = config.Config.PROVENANCE_DB_URL
337 # Get counter handler from Config class
338 if not hasattr(config.Config, "COUNTER_HANDLER"):
339 logger.error("Config class must define COUNTER_HANDLER")
340 return 1
342 counter_handler = config.Config.COUNTER_HANDLER
344 success = reset_entity_provenance(
345 entity_uri=URIRef(args.entity_uri),
346 provenance_endpoint=provenance_endpoint,
347 counter_handler=counter_handler,
348 )
350 if success:
351 logger.info("Successfully reset provenance for entity %s", args.entity_uri)
352 return 0
353 logger.error("Failed to reset provenance for entity %s", args.entity_uri)
354 return 1
357if __name__ == "__main__": # pragma: no cover
358 sys.exit(main())