Coverage for heritrace/scripts/reset_provenance.py: 100%
123 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-06-24 11:39 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-06-24 11:39 +0000
1#!/usr/bin/env python3
3import argparse
4import importlib.util
5import logging
6import sys
7from typing import Union
8from urllib.parse import urlparse
10from heritrace.extensions import SPARQLWrapperWithRetry
11from heritrace.utils.converters import convert_to_datetime
12from rdflib import URIRef
13from rdflib_ocdm.counter_handler.counter_handler import CounterHandler
14from SPARQLWrapper import JSON
17class ProvenanceResetter:
18 """
19 A class to reset the provenance of a specific entity by deleting all snapshots
20 after snapshot 1 and resetting the provenance counters.
21 """
23 def __init__(
24 self,
25 provenance_endpoint: str,
26 counter_handler: CounterHandler,
27 ):
28 """
29 Initialize the ProvenanceResetter.
31 Args:
32 provenance_endpoint: The SPARQL endpoint for the provenance database
33 counter_handler: An instance of a CounterHandler to manage provenance counters
34 """
35 self.provenance_endpoint = provenance_endpoint
36 self.provenance_sparql = SPARQLWrapperWithRetry(provenance_endpoint)
37 self.provenance_sparql.setReturnFormat(JSON)
38 self.counter_handler = counter_handler
39 self.logger = logging.getLogger(__name__)
41 def reset_entity_provenance(self, entity_uri: Union[str, URIRef]) -> bool:
42 """
43 Reset the provenance of a specific entity by deleting all snapshots
44 after snapshot 1, removing the invalidatedAtTime property from the first snapshot,
45 and resetting the provenance counters.
47 Args:
48 entity_uri: The URI of the entity to reset
50 Returns:
51 bool: True if the operation was successful, False otherwise
52 """
53 if not isinstance(entity_uri, URIRef):
54 entity_uri = URIRef(entity_uri)
56 # Step 1: Find all snapshots for the entity
57 snapshots = self._get_entity_snapshots(entity_uri)
58 if not snapshots:
59 self.logger.warning(f"No snapshots found for entity {entity_uri}")
60 return False
62 # Sort snapshots by generation time, converting strings to datetime objects
63 sorted_snapshots = sorted(
64 snapshots, key=lambda x: convert_to_datetime(x["generation_time"])
65 )
67 # Keep only the first snapshot
68 first_snapshot = sorted_snapshots[0]
69 snapshots_to_delete = sorted_snapshots[1:]
71 if not snapshots_to_delete:
72 self.logger.info(f"Entity {entity_uri} has only one snapshot, nothing to reset")
73 # Still remove invalidatedAtTime from the first snapshot
74 self._remove_invalidated_time(first_snapshot)
75 return True
77 # Step 2: Delete all snapshots after the first one
78 success = self._delete_snapshots(snapshots_to_delete)
79 if not success:
80 return False
82 # Step 3: Reset the provenance counter for this entity
83 self._reset_provenance_counter(entity_uri)
85 # Step 4: Remove invalidatedAtTime from the first snapshot
86 self._remove_invalidated_time(first_snapshot)
88 self.logger.info(f"Successfully reset provenance for entity {entity_uri}")
89 return True
91 def _get_entity_snapshots(self, entity_uri: URIRef) -> list:
92 """
93 Get all snapshots for a specific entity.
95 Args:
96 entity_uri: The URI of the entity
98 Returns:
99 list: A list of dictionaries containing snapshot information
100 """
101 query = f"""
102 PREFIX prov: <http://www.w3.org/ns/prov#>
104 SELECT ?snapshot ?generation_time
105 WHERE {{
106 GRAPH ?g {{
107 ?snapshot prov:specializationOf <{entity_uri}> ;
108 prov:generatedAtTime ?generation_time .
109 }}
110 }}
111 ORDER BY ?generation_time
112 """
114 self.provenance_sparql.setQuery(query)
115 results = self.provenance_sparql.queryAndConvert()
117 snapshots = []
118 for binding in results["results"]["bindings"]:
119 snapshots.append({
120 "uri": binding["snapshot"]["value"],
121 "generation_time": binding["generation_time"]["value"]
122 })
124 return snapshots
126 def _delete_snapshots(self, snapshots: list) -> bool:
127 """
128 Delete a list of snapshots from the provenance database.
130 Args:
131 snapshots: A list of snapshot dictionaries to delete
133 Returns:
134 bool: True if the operation was successful, False otherwise
135 """
136 if not snapshots:
137 return True
139 # Virtuoso has limitations with DELETE WHERE queries
140 # We need to delete each snapshot individually
141 success = True
142 for snapshot in snapshots:
143 snapshot_uri = snapshot['uri']
145 # Construct the graph name based on the snapshot URI
146 # The graph name follows the pattern: snapshot_uri/prov/
147 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/"
149 # Delete all triples where the snapshot is the subject
150 query = f"""
151 PREFIX prov: <http://www.w3.org/ns/prov#>
153 DELETE {{
154 GRAPH <{graph_uri}> {{
155 <{snapshot_uri}> ?p ?o .
156 }}
157 }}
158 WHERE {{
159 GRAPH <{graph_uri}> {{
160 <{snapshot_uri}> ?p ?o .
161 }}
162 }}
163 """
165 try:
166 self.provenance_sparql.setQuery(query)
167 self.provenance_sparql.method = "POST"
168 self.provenance_sparql.query()
170 # Also delete triples where the snapshot is the object
171 query = f"""
172 PREFIX prov: <http://www.w3.org/ns/prov#>
174 DELETE {{
175 GRAPH <{graph_uri}> {{
176 ?s ?p <{snapshot_uri}> .
177 }}
178 }}
179 WHERE {{
180 GRAPH <{graph_uri}> {{
181 ?s ?p <{snapshot_uri}> .
182 }}
183 }}
184 """
186 self.provenance_sparql.setQuery(query)
187 self.provenance_sparql.query()
189 self.logger.debug(f"Successfully deleted snapshot: {snapshot_uri} from graph: {graph_uri}")
190 except Exception as e:
191 self.logger.error(f"Error deleting snapshot {snapshot_uri}: {e}")
192 success = False
194 return success
196 def _reset_provenance_counter(self, entity_uri: URIRef) -> None:
197 """
198 Reset the provenance counter for a specific entity to 1.
200 Args:
201 entity_uri: The URI of the entity
202 """
203 # Extract the entity name from the URI
204 parsed_uri = urlparse(str(entity_uri))
205 entity_name = parsed_uri.path.split('/')[-1]
207 # Set the counter to 1 (for the first snapshot)
208 self.counter_handler.set_counter(1, entity_name)
209 self.logger.info(f"Reset provenance counter for entity {entity_uri} to 1")
211 def _remove_invalidated_time(self, snapshot: dict) -> bool:
212 """
213 Remove the invalidatedAtTime property from a snapshot.
215 Args:
216 snapshot: A dictionary containing snapshot information
218 Returns:
219 bool: True if the operation was successful, False otherwise
220 """
221 snapshot_uri = snapshot['uri']
223 # Construct the graph name based on the snapshot URI
224 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/"
226 # Delete the invalidatedAtTime property
227 query = f"""
228 PREFIX prov: <http://www.w3.org/ns/prov#>
230 DELETE {{
231 GRAPH <{graph_uri}> {{
232 <{snapshot_uri}> prov:invalidatedAtTime ?time .
233 }}
234 }}
235 WHERE {{
236 GRAPH <{graph_uri}> {{
237 <{snapshot_uri}> prov:invalidatedAtTime ?time .
238 }}
239 }}
240 """
242 try:
243 self.provenance_sparql.setQuery(query)
244 self.provenance_sparql.method = "POST"
245 self.provenance_sparql.query()
246 self.logger.info(f"Successfully removed invalidatedAtTime from snapshot: {snapshot_uri}")
247 return True
248 except Exception as e:
249 self.logger.error(f"Error removing invalidatedAtTime from snapshot {snapshot_uri}: {e}")
250 return False
253def reset_entity_provenance(
254 entity_uri: str,
255 provenance_endpoint: str,
256 counter_handler: CounterHandler,
257) -> bool:
258 """
259 Reset the provenance of a specific entity by deleting all snapshots
260 after snapshot 1, removing the invalidatedAtTime property from the first snapshot,
261 and resetting the provenance counters.
263 Args:
264 entity_uri: The URI of the entity to reset
265 provenance_endpoint: The SPARQL endpoint for the provenance database
266 counter_handler: An instance of a CounterHandler to manage provenance counters
268 Returns:
269 bool: True if the operation was successful, False otherwise
270 """
271 resetter = ProvenanceResetter(
272 provenance_endpoint=provenance_endpoint,
273 counter_handler=counter_handler,
274 )
276 return resetter.reset_entity_provenance(entity_uri)
279def load_config(config_path):
280 """
281 Load configuration from a Python file.
283 Args:
284 config_path: Path to the configuration file
286 Returns:
287 module: The loaded configuration module
288 """
289 try:
290 spec = importlib.util.spec_from_file_location("config", config_path)
291 config = importlib.util.module_from_spec(spec)
292 spec.loader.exec_module(config)
293 return config
294 except Exception as e:
295 logging.error(f"Error loading configuration file: {e}")
296 sys.exit(1)
299def main():
300 """Main entry point for the script when run from the command line."""
301 parser = argparse.ArgumentParser(description="Reset the provenance of a specific entity")
302 parser.add_argument("entity_uri", help="URI of the entity to reset")
303 parser.add_argument("--config", "-c", required=True, help="Path to the configuration file")
304 parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
306 args = parser.parse_args()
308 # Setup logging
309 log_level = logging.DEBUG if args.verbose else logging.INFO
310 logging.basicConfig(
311 level=log_level,
312 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
313 )
315 # Load configuration
316 config = load_config(args.config)
318 # Check if Config class exists
319 if not hasattr(config, "Config"):
320 logging.error("Configuration file must define a Config class")
321 return 1
323 # Get required configuration from Config class
324 if not hasattr(config.Config, "PROVENANCE_DB_URL"):
325 logging.error("Config class must define PROVENANCE_DB_URL")
326 return 1
328 provenance_endpoint = config.Config.PROVENANCE_DB_URL
330 # Get counter handler from Config class
331 if not hasattr(config.Config, "COUNTER_HANDLER"):
332 logging.error("Config class must define COUNTER_HANDLER")
333 return 1
335 counter_handler = config.Config.COUNTER_HANDLER
337 success = reset_entity_provenance(
338 entity_uri=args.entity_uri,
339 provenance_endpoint=provenance_endpoint,
340 counter_handler=counter_handler
341 )
343 if success:
344 logging.info(f"Successfully reset provenance for entity {args.entity_uri}")
345 return 0
346 else:
347 logging.error(f"Failed to reset provenance for entity {args.entity_uri}")
348 return 1
351if __name__ == "__main__": # pragma: no cover
352 sys.exit(main())