Coverage for heritrace/scripts/reset_provenance.py: 100%
122 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-18 11:10 +0000
1#!/usr/bin/env python3
3import argparse
4import importlib.util
5import logging
6import sys
7from typing import Union
8from urllib.parse import urlparse
10from rdflib import URIRef
11from SPARQLWrapper import JSON, SPARQLWrapper
12from rdflib_ocdm.counter_handler.counter_handler import CounterHandler
13from heritrace.utils.converters import convert_to_datetime
16class ProvenanceResetter:
17 """
18 A class to reset the provenance of a specific entity by deleting all snapshots
19 after snapshot 1 and resetting the provenance counters.
20 """
22 def __init__(
23 self,
24 provenance_endpoint: str,
25 counter_handler: CounterHandler,
26 ):
27 """
28 Initialize the ProvenanceResetter.
30 Args:
31 provenance_endpoint: The SPARQL endpoint for the provenance database
32 counter_handler: An instance of a CounterHandler to manage provenance counters
33 """
34 self.provenance_endpoint = provenance_endpoint
35 self.provenance_sparql = SPARQLWrapper(provenance_endpoint)
36 self.provenance_sparql.setReturnFormat(JSON)
37 self.counter_handler = counter_handler
38 self.logger = logging.getLogger(__name__)
40 def reset_entity_provenance(self, entity_uri: Union[str, URIRef]) -> bool:
41 """
42 Reset the provenance of a specific entity by deleting all snapshots
43 after snapshot 1, removing the invalidatedAtTime property from the first snapshot,
44 and resetting the provenance counters.
46 Args:
47 entity_uri: The URI of the entity to reset
49 Returns:
50 bool: True if the operation was successful, False otherwise
51 """
52 if not isinstance(entity_uri, URIRef):
53 entity_uri = URIRef(entity_uri)
55 # Step 1: Find all snapshots for the entity
56 snapshots = self._get_entity_snapshots(entity_uri)
57 if not snapshots:
58 self.logger.warning(f"No snapshots found for entity {entity_uri}")
59 return False
61 # Sort snapshots by generation time, converting strings to datetime objects
62 sorted_snapshots = sorted(
63 snapshots, key=lambda x: convert_to_datetime(x["generation_time"])
64 )
66 # Keep only the first snapshot
67 first_snapshot = sorted_snapshots[0]
68 snapshots_to_delete = sorted_snapshots[1:]
70 if not snapshots_to_delete:
71 self.logger.info(f"Entity {entity_uri} has only one snapshot, nothing to reset")
72 # Still remove invalidatedAtTime from the first snapshot
73 self._remove_invalidated_time(first_snapshot)
74 return True
76 # Step 2: Delete all snapshots after the first one
77 success = self._delete_snapshots(snapshots_to_delete)
78 if not success:
79 return False
81 # Step 3: Reset the provenance counter for this entity
82 self._reset_provenance_counter(entity_uri)
84 # Step 4: Remove invalidatedAtTime from the first snapshot
85 self._remove_invalidated_time(first_snapshot)
87 self.logger.info(f"Successfully reset provenance for entity {entity_uri}")
88 return True
90 def _get_entity_snapshots(self, entity_uri: URIRef) -> list:
91 """
92 Get all snapshots for a specific entity.
94 Args:
95 entity_uri: The URI of the entity
97 Returns:
98 list: A list of dictionaries containing snapshot information
99 """
100 query = f"""
101 PREFIX prov: <http://www.w3.org/ns/prov#>
103 SELECT ?snapshot ?generation_time
104 WHERE {{
105 GRAPH ?g {{
106 ?snapshot prov:specializationOf <{entity_uri}> ;
107 prov:generatedAtTime ?generation_time .
108 }}
109 }}
110 ORDER BY ?generation_time
111 """
113 self.provenance_sparql.setQuery(query)
114 results = self.provenance_sparql.queryAndConvert()
116 snapshots = []
117 for binding in results["results"]["bindings"]:
118 snapshots.append({
119 "uri": binding["snapshot"]["value"],
120 "generation_time": binding["generation_time"]["value"]
121 })
123 return snapshots
125 def _delete_snapshots(self, snapshots: list) -> bool:
126 """
127 Delete a list of snapshots from the provenance database.
129 Args:
130 snapshots: A list of snapshot dictionaries to delete
132 Returns:
133 bool: True if the operation was successful, False otherwise
134 """
135 if not snapshots:
136 return True
138 # Virtuoso has limitations with DELETE WHERE queries
139 # We need to delete each snapshot individually
140 success = True
141 for snapshot in snapshots:
142 snapshot_uri = snapshot['uri']
144 # Construct the graph name based on the snapshot URI
145 # The graph name follows the pattern: snapshot_uri/prov/
146 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/"
148 # Delete all triples where the snapshot is the subject
149 query = f"""
150 PREFIX prov: <http://www.w3.org/ns/prov#>
152 DELETE {{
153 GRAPH <{graph_uri}> {{
154 <{snapshot_uri}> ?p ?o .
155 }}
156 }}
157 WHERE {{
158 GRAPH <{graph_uri}> {{
159 <{snapshot_uri}> ?p ?o .
160 }}
161 }}
162 """
164 try:
165 self.provenance_sparql.setQuery(query)
166 self.provenance_sparql.method = "POST"
167 self.provenance_sparql.query()
169 # Also delete triples where the snapshot is the object
170 query = f"""
171 PREFIX prov: <http://www.w3.org/ns/prov#>
173 DELETE {{
174 GRAPH <{graph_uri}> {{
175 ?s ?p <{snapshot_uri}> .
176 }}
177 }}
178 WHERE {{
179 GRAPH <{graph_uri}> {{
180 ?s ?p <{snapshot_uri}> .
181 }}
182 }}
183 """
185 self.provenance_sparql.setQuery(query)
186 self.provenance_sparql.query()
188 self.logger.debug(f"Successfully deleted snapshot: {snapshot_uri} from graph: {graph_uri}")
189 except Exception as e:
190 self.logger.error(f"Error deleting snapshot {snapshot_uri}: {e}")
191 success = False
193 return success
195 def _reset_provenance_counter(self, entity_uri: URIRef) -> None:
196 """
197 Reset the provenance counter for a specific entity to 1.
199 Args:
200 entity_uri: The URI of the entity
201 """
202 # Extract the entity name from the URI
203 parsed_uri = urlparse(str(entity_uri))
204 entity_name = parsed_uri.path.split('/')[-1]
206 # Set the counter to 1 (for the first snapshot)
207 self.counter_handler.set_counter(1, entity_name)
208 self.logger.info(f"Reset provenance counter for entity {entity_uri} to 1")
210 def _remove_invalidated_time(self, snapshot: dict) -> bool:
211 """
212 Remove the invalidatedAtTime property from a snapshot.
214 Args:
215 snapshot: A dictionary containing snapshot information
217 Returns:
218 bool: True if the operation was successful, False otherwise
219 """
220 snapshot_uri = snapshot['uri']
222 # Construct the graph name based on the snapshot URI
223 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/"
225 # Delete the invalidatedAtTime property
226 query = f"""
227 PREFIX prov: <http://www.w3.org/ns/prov#>
229 DELETE {{
230 GRAPH <{graph_uri}> {{
231 <{snapshot_uri}> prov:invalidatedAtTime ?time .
232 }}
233 }}
234 WHERE {{
235 GRAPH <{graph_uri}> {{
236 <{snapshot_uri}> prov:invalidatedAtTime ?time .
237 }}
238 }}
239 """
241 try:
242 self.provenance_sparql.setQuery(query)
243 self.provenance_sparql.method = "POST"
244 self.provenance_sparql.query()
245 self.logger.info(f"Successfully removed invalidatedAtTime from snapshot: {snapshot_uri}")
246 return True
247 except Exception as e:
248 self.logger.error(f"Error removing invalidatedAtTime from snapshot {snapshot_uri}: {e}")
249 return False
252def reset_entity_provenance(
253 entity_uri: str,
254 provenance_endpoint: str,
255 counter_handler: CounterHandler,
256) -> bool:
257 """
258 Reset the provenance of a specific entity by deleting all snapshots
259 after snapshot 1, removing the invalidatedAtTime property from the first snapshot,
260 and resetting the provenance counters.
262 Args:
263 entity_uri: The URI of the entity to reset
264 provenance_endpoint: The SPARQL endpoint for the provenance database
265 counter_handler: An instance of a CounterHandler to manage provenance counters
267 Returns:
268 bool: True if the operation was successful, False otherwise
269 """
270 resetter = ProvenanceResetter(
271 provenance_endpoint=provenance_endpoint,
272 counter_handler=counter_handler,
273 )
275 return resetter.reset_entity_provenance(entity_uri)
278def load_config(config_path):
279 """
280 Load configuration from a Python file.
282 Args:
283 config_path: Path to the configuration file
285 Returns:
286 module: The loaded configuration module
287 """
288 try:
289 spec = importlib.util.spec_from_file_location("config", config_path)
290 config = importlib.util.module_from_spec(spec)
291 spec.loader.exec_module(config)
292 return config
293 except Exception as e:
294 logging.error(f"Error loading configuration file: {e}")
295 sys.exit(1)
298def main():
299 """Main entry point for the script when run from the command line."""
300 parser = argparse.ArgumentParser(description="Reset the provenance of a specific entity")
301 parser.add_argument("entity_uri", help="URI of the entity to reset")
302 parser.add_argument("--config", "-c", required=True, help="Path to the configuration file")
303 parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
305 args = parser.parse_args()
307 # Setup logging
308 log_level = logging.DEBUG if args.verbose else logging.INFO
309 logging.basicConfig(
310 level=log_level,
311 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
312 )
314 # Load configuration
315 config = load_config(args.config)
317 # Check if Config class exists
318 if not hasattr(config, "Config"):
319 logging.error("Configuration file must define a Config class")
320 return 1
322 # Get required configuration from Config class
323 if not hasattr(config.Config, "PROVENANCE_DB_URL"):
324 logging.error("Config class must define PROVENANCE_DB_URL")
325 return 1
327 provenance_endpoint = config.Config.PROVENANCE_DB_URL
329 # Get counter handler from Config class
330 if not hasattr(config.Config, "COUNTER_HANDLER"):
331 logging.error("Config class must define COUNTER_HANDLER")
332 return 1
334 counter_handler = config.Config.COUNTER_HANDLER
336 success = reset_entity_provenance(
337 entity_uri=args.entity_uri,
338 provenance_endpoint=provenance_endpoint,
339 counter_handler=counter_handler
340 )
342 if success:
343 logging.info(f"Successfully reset provenance for entity {args.entity_uri}")
344 return 0
345 else:
346 logging.error(f"Failed to reset provenance for entity {args.entity_uri}")
347 return 1
350if __name__ == "__main__": # pragma: no cover
351 sys.exit(main())