Coverage for heritrace / scripts / reset_provenance.py: 100%
123 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
1#!/usr/bin/env python3
3# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7import argparse
8import importlib.util
9import logging
10import sys
11from typing import Union
12from urllib.parse import urlparse
14from heritrace.extensions import SPARQLWrapperWithRetry
15from heritrace.utils.converters import convert_to_datetime
16from rdflib import URIRef
17from rdflib_ocdm.counter_handler.counter_handler import CounterHandler
18from SPARQLWrapper import JSON
21class ProvenanceResetter:
22 """
23 A class to reset the provenance of a specific entity by deleting all snapshots
24 after snapshot 1 and resetting the provenance counters.
25 """
27 def __init__(
28 self,
29 provenance_endpoint: str,
30 counter_handler: CounterHandler,
31 ):
32 """
33 Initialize the ProvenanceResetter.
35 Args:
36 provenance_endpoint: The SPARQL endpoint for the provenance database
37 counter_handler: An instance of a CounterHandler to manage provenance counters
38 """
39 self.provenance_endpoint = provenance_endpoint
40 self.provenance_sparql = SPARQLWrapperWithRetry(provenance_endpoint)
41 self.provenance_sparql.setReturnFormat(JSON)
42 self.counter_handler = counter_handler
43 self.logger = logging.getLogger(__name__)
45 def reset_entity_provenance(self, entity_uri: Union[str, URIRef]) -> bool:
46 """
47 Reset the provenance of a specific entity by deleting all snapshots
48 after snapshot 1, removing the invalidatedAtTime property from the first snapshot,
49 and resetting the provenance counters.
51 Args:
52 entity_uri: The URI of the entity to reset
54 Returns:
55 bool: True if the operation was successful, False otherwise
56 """
57 if not isinstance(entity_uri, URIRef):
58 entity_uri = URIRef(entity_uri)
60 # Step 1: Find all snapshots for the entity
61 snapshots = self._get_entity_snapshots(entity_uri)
62 if not snapshots:
63 self.logger.warning(f"No snapshots found for entity {entity_uri}")
64 return False
66 # Sort snapshots by generation time, converting strings to datetime objects
67 sorted_snapshots = sorted(
68 snapshots, key=lambda x: convert_to_datetime(x["generation_time"])
69 )
71 # Keep only the first snapshot
72 first_snapshot = sorted_snapshots[0]
73 snapshots_to_delete = sorted_snapshots[1:]
75 if not snapshots_to_delete:
76 self.logger.info(f"Entity {entity_uri} has only one snapshot, nothing to reset")
77 # Still remove invalidatedAtTime from the first snapshot
78 self._remove_invalidated_time(first_snapshot)
79 return True
81 # Step 2: Delete all snapshots after the first one
82 success = self._delete_snapshots(snapshots_to_delete)
83 if not success:
84 return False
86 # Step 3: Reset the provenance counter for this entity
87 self._reset_provenance_counter(entity_uri)
89 # Step 4: Remove invalidatedAtTime from the first snapshot
90 self._remove_invalidated_time(first_snapshot)
92 self.logger.info(f"Successfully reset provenance for entity {entity_uri}")
93 return True
95 def _get_entity_snapshots(self, entity_uri: URIRef) -> list:
96 """
97 Get all snapshots for a specific entity.
99 Args:
100 entity_uri: The URI of the entity
102 Returns:
103 list: A list of dictionaries containing snapshot information
104 """
105 query = f"""
106 PREFIX prov: <http://www.w3.org/ns/prov#>
108 SELECT ?snapshot ?generation_time
109 WHERE {{
110 GRAPH ?g {{
111 ?snapshot prov:specializationOf <{entity_uri}> ;
112 prov:generatedAtTime ?generation_time .
113 }}
114 }}
115 ORDER BY ?generation_time
116 """
118 self.provenance_sparql.setQuery(query)
119 results = self.provenance_sparql.queryAndConvert()
121 snapshots = []
122 for binding in results["results"]["bindings"]:
123 snapshots.append({
124 "uri": binding["snapshot"]["value"],
125 "generation_time": binding["generation_time"]["value"]
126 })
128 return snapshots
130 def _delete_snapshots(self, snapshots: list) -> bool:
131 """
132 Delete a list of snapshots from the provenance database.
134 Args:
135 snapshots: A list of snapshot dictionaries to delete
137 Returns:
138 bool: True if the operation was successful, False otherwise
139 """
140 if not snapshots:
141 return True
143 # Virtuoso has limitations with DELETE WHERE queries
144 # We need to delete each snapshot individually
145 success = True
146 for snapshot in snapshots:
147 snapshot_uri = snapshot['uri']
149 # Construct the graph name based on the snapshot URI
150 # The graph name follows the pattern: snapshot_uri/prov/
151 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/"
153 # Delete all triples where the snapshot is the subject
154 query = f"""
155 PREFIX prov: <http://www.w3.org/ns/prov#>
157 DELETE {{
158 GRAPH <{graph_uri}> {{
159 <{snapshot_uri}> ?p ?o .
160 }}
161 }}
162 WHERE {{
163 GRAPH <{graph_uri}> {{
164 <{snapshot_uri}> ?p ?o .
165 }}
166 }}
167 """
169 try:
170 self.provenance_sparql.setQuery(query)
171 self.provenance_sparql.method = "POST"
172 self.provenance_sparql.query()
174 # Also delete triples where the snapshot is the object
175 query = f"""
176 PREFIX prov: <http://www.w3.org/ns/prov#>
178 DELETE {{
179 GRAPH <{graph_uri}> {{
180 ?s ?p <{snapshot_uri}> .
181 }}
182 }}
183 WHERE {{
184 GRAPH <{graph_uri}> {{
185 ?s ?p <{snapshot_uri}> .
186 }}
187 }}
188 """
190 self.provenance_sparql.setQuery(query)
191 self.provenance_sparql.query()
193 self.logger.debug(f"Successfully deleted snapshot: {snapshot_uri} from graph: {graph_uri}")
194 except Exception as e:
195 self.logger.error(f"Error deleting snapshot {snapshot_uri}: {e}")
196 success = False
198 return success
200 def _reset_provenance_counter(self, entity_uri: URIRef) -> None:
201 """
202 Reset the provenance counter for a specific entity to 1.
204 Args:
205 entity_uri: The URI of the entity
206 """
207 # Extract the entity name from the URI
208 parsed_uri = urlparse(str(entity_uri))
209 entity_name = parsed_uri.path.split('/')[-1]
211 # Set the counter to 1 (for the first snapshot)
212 self.counter_handler.set_counter(1, entity_name)
213 self.logger.info(f"Reset provenance counter for entity {entity_uri} to 1")
215 def _remove_invalidated_time(self, snapshot: dict) -> bool:
216 """
217 Remove the invalidatedAtTime property from a snapshot.
219 Args:
220 snapshot: A dictionary containing snapshot information
222 Returns:
223 bool: True if the operation was successful, False otherwise
224 """
225 snapshot_uri = snapshot['uri']
227 # Construct the graph name based on the snapshot URI
228 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/"
230 # Delete the invalidatedAtTime property
231 query = f"""
232 PREFIX prov: <http://www.w3.org/ns/prov#>
234 DELETE {{
235 GRAPH <{graph_uri}> {{
236 <{snapshot_uri}> prov:invalidatedAtTime ?time .
237 }}
238 }}
239 WHERE {{
240 GRAPH <{graph_uri}> {{
241 <{snapshot_uri}> prov:invalidatedAtTime ?time .
242 }}
243 }}
244 """
246 try:
247 self.provenance_sparql.setQuery(query)
248 self.provenance_sparql.method = "POST"
249 self.provenance_sparql.query()
250 self.logger.info(f"Successfully removed invalidatedAtTime from snapshot: {snapshot_uri}")
251 return True
252 except Exception as e:
253 self.logger.error(f"Error removing invalidatedAtTime from snapshot {snapshot_uri}: {e}")
254 return False
257def reset_entity_provenance(
258 entity_uri: str,
259 provenance_endpoint: str,
260 counter_handler: CounterHandler,
261) -> bool:
262 """
263 Reset the provenance of a specific entity by deleting all snapshots
264 after snapshot 1, removing the invalidatedAtTime property from the first snapshot,
265 and resetting the provenance counters.
267 Args:
268 entity_uri: The URI of the entity to reset
269 provenance_endpoint: The SPARQL endpoint for the provenance database
270 counter_handler: An instance of a CounterHandler to manage provenance counters
272 Returns:
273 bool: True if the operation was successful, False otherwise
274 """
275 resetter = ProvenanceResetter(
276 provenance_endpoint=provenance_endpoint,
277 counter_handler=counter_handler,
278 )
280 return resetter.reset_entity_provenance(entity_uri)
283def load_config(config_path):
284 """
285 Load configuration from a Python file.
287 Args:
288 config_path: Path to the configuration file
290 Returns:
291 module: The loaded configuration module
292 """
293 try:
294 spec = importlib.util.spec_from_file_location("config", config_path)
295 config = importlib.util.module_from_spec(spec)
296 spec.loader.exec_module(config)
297 return config
298 except Exception as e:
299 logging.error(f"Error loading configuration file: {e}")
300 sys.exit(1)
303def main():
304 """Main entry point for the script when run from the command line."""
305 parser = argparse.ArgumentParser(description="Reset the provenance of a specific entity")
306 parser.add_argument("entity_uri", help="URI of the entity to reset")
307 parser.add_argument("--config", "-c", required=True, help="Path to the configuration file")
308 parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
310 args = parser.parse_args()
312 # Setup logging
313 log_level = logging.DEBUG if args.verbose else logging.INFO
314 logging.basicConfig(
315 level=log_level,
316 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
317 )
319 # Load configuration
320 config = load_config(args.config)
322 # Check if Config class exists
323 if not hasattr(config, "Config"):
324 logging.error("Configuration file must define a Config class")
325 return 1
327 # Get required configuration from Config class
328 if not hasattr(config.Config, "PROVENANCE_DB_URL"):
329 logging.error("Config class must define PROVENANCE_DB_URL")
330 return 1
332 provenance_endpoint = config.Config.PROVENANCE_DB_URL
334 # Get counter handler from Config class
335 if not hasattr(config.Config, "COUNTER_HANDLER"):
336 logging.error("Config class must define COUNTER_HANDLER")
337 return 1
339 counter_handler = config.Config.COUNTER_HANDLER
341 success = reset_entity_provenance(
342 entity_uri=args.entity_uri,
343 provenance_endpoint=provenance_endpoint,
344 counter_handler=counter_handler
345 )
347 if success:
348 logging.info(f"Successfully reset provenance for entity {args.entity_uri}")
349 return 0
350 else:
351 logging.error(f"Failed to reset provenance for entity {args.entity_uri}")
352 return 1
355if __name__ == "__main__": # pragma: no cover
356 sys.exit(main())