Coverage for heritrace/scripts/reset_provenance.py: 100%

123 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-06-24 11:39 +0000

1#!/usr/bin/env python3 

2 

3import argparse 

4import importlib.util 

5import logging 

6import sys 

7from typing import Union 

8from urllib.parse import urlparse 

9 

10from heritrace.extensions import SPARQLWrapperWithRetry 

11from heritrace.utils.converters import convert_to_datetime 

12from rdflib import URIRef 

13from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

14from SPARQLWrapper import JSON 

15 

16 

17class ProvenanceResetter: 

18 """ 

19 A class to reset the provenance of a specific entity by deleting all snapshots 

20 after snapshot 1 and resetting the provenance counters. 

21 """ 

22 

23 def __init__( 

24 self, 

25 provenance_endpoint: str, 

26 counter_handler: CounterHandler, 

27 ): 

28 """ 

29 Initialize the ProvenanceResetter. 

30 

31 Args: 

32 provenance_endpoint: The SPARQL endpoint for the provenance database 

33 counter_handler: An instance of a CounterHandler to manage provenance counters 

34 """ 

35 self.provenance_endpoint = provenance_endpoint 

36 self.provenance_sparql = SPARQLWrapperWithRetry(provenance_endpoint) 

37 self.provenance_sparql.setReturnFormat(JSON) 

38 self.counter_handler = counter_handler 

39 self.logger = logging.getLogger(__name__) 

40 

41 def reset_entity_provenance(self, entity_uri: Union[str, URIRef]) -> bool: 

42 """ 

43 Reset the provenance of a specific entity by deleting all snapshots 

44 after snapshot 1, removing the invalidatedAtTime property from the first snapshot, 

45 and resetting the provenance counters. 

46 

47 Args: 

48 entity_uri: The URI of the entity to reset 

49 

50 Returns: 

51 bool: True if the operation was successful, False otherwise 

52 """ 

53 if not isinstance(entity_uri, URIRef): 

54 entity_uri = URIRef(entity_uri) 

55 

56 # Step 1: Find all snapshots for the entity 

57 snapshots = self._get_entity_snapshots(entity_uri) 

58 if not snapshots: 

59 self.logger.warning(f"No snapshots found for entity {entity_uri}") 

60 return False 

61 

62 # Sort snapshots by generation time, converting strings to datetime objects 

63 sorted_snapshots = sorted( 

64 snapshots, key=lambda x: convert_to_datetime(x["generation_time"]) 

65 ) 

66 

67 # Keep only the first snapshot 

68 first_snapshot = sorted_snapshots[0] 

69 snapshots_to_delete = sorted_snapshots[1:] 

70 

71 if not snapshots_to_delete: 

72 self.logger.info(f"Entity {entity_uri} has only one snapshot, nothing to reset") 

73 # Still remove invalidatedAtTime from the first snapshot 

74 self._remove_invalidated_time(first_snapshot) 

75 return True 

76 

77 # Step 2: Delete all snapshots after the first one 

78 success = self._delete_snapshots(snapshots_to_delete) 

79 if not success: 

80 return False 

81 

82 # Step 3: Reset the provenance counter for this entity 

83 self._reset_provenance_counter(entity_uri) 

84 

85 # Step 4: Remove invalidatedAtTime from the first snapshot 

86 self._remove_invalidated_time(first_snapshot) 

87 

88 self.logger.info(f"Successfully reset provenance for entity {entity_uri}") 

89 return True 

90 

91 def _get_entity_snapshots(self, entity_uri: URIRef) -> list: 

92 """ 

93 Get all snapshots for a specific entity. 

94 

95 Args: 

96 entity_uri: The URI of the entity 

97 

98 Returns: 

99 list: A list of dictionaries containing snapshot information 

100 """ 

101 query = f""" 

102 PREFIX prov: <http://www.w3.org/ns/prov#> 

103  

104 SELECT ?snapshot ?generation_time 

105 WHERE {{ 

106 GRAPH ?g {{ 

107 ?snapshot prov:specializationOf <{entity_uri}> ; 

108 prov:generatedAtTime ?generation_time . 

109 }} 

110 }} 

111 ORDER BY ?generation_time 

112 """ 

113 

114 self.provenance_sparql.setQuery(query) 

115 results = self.provenance_sparql.queryAndConvert() 

116 

117 snapshots = [] 

118 for binding in results["results"]["bindings"]: 

119 snapshots.append({ 

120 "uri": binding["snapshot"]["value"], 

121 "generation_time": binding["generation_time"]["value"] 

122 }) 

123 

124 return snapshots 

125 

126 def _delete_snapshots(self, snapshots: list) -> bool: 

127 """ 

128 Delete a list of snapshots from the provenance database. 

129 

130 Args: 

131 snapshots: A list of snapshot dictionaries to delete 

132 

133 Returns: 

134 bool: True if the operation was successful, False otherwise 

135 """ 

136 if not snapshots: 

137 return True 

138 

139 # Virtuoso has limitations with DELETE WHERE queries 

140 # We need to delete each snapshot individually 

141 success = True 

142 for snapshot in snapshots: 

143 snapshot_uri = snapshot['uri'] 

144 

145 # Construct the graph name based on the snapshot URI 

146 # The graph name follows the pattern: snapshot_uri/prov/ 

147 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/" 

148 

149 # Delete all triples where the snapshot is the subject 

150 query = f""" 

151 PREFIX prov: <http://www.w3.org/ns/prov#> 

152  

153 DELETE {{ 

154 GRAPH <{graph_uri}> {{ 

155 <{snapshot_uri}> ?p ?o . 

156 }} 

157 }} 

158 WHERE {{ 

159 GRAPH <{graph_uri}> {{ 

160 <{snapshot_uri}> ?p ?o . 

161 }} 

162 }} 

163 """ 

164 

165 try: 

166 self.provenance_sparql.setQuery(query) 

167 self.provenance_sparql.method = "POST" 

168 self.provenance_sparql.query() 

169 

170 # Also delete triples where the snapshot is the object 

171 query = f""" 

172 PREFIX prov: <http://www.w3.org/ns/prov#> 

173  

174 DELETE {{ 

175 GRAPH <{graph_uri}> {{ 

176 ?s ?p <{snapshot_uri}> . 

177 }} 

178 }} 

179 WHERE {{ 

180 GRAPH <{graph_uri}> {{ 

181 ?s ?p <{snapshot_uri}> . 

182 }} 

183 }} 

184 """ 

185 

186 self.provenance_sparql.setQuery(query) 

187 self.provenance_sparql.query() 

188 

189 self.logger.debug(f"Successfully deleted snapshot: {snapshot_uri} from graph: {graph_uri}") 

190 except Exception as e: 

191 self.logger.error(f"Error deleting snapshot {snapshot_uri}: {e}") 

192 success = False 

193 

194 return success 

195 

196 def _reset_provenance_counter(self, entity_uri: URIRef) -> None: 

197 """ 

198 Reset the provenance counter for a specific entity to 1. 

199 

200 Args: 

201 entity_uri: The URI of the entity 

202 """ 

203 # Extract the entity name from the URI 

204 parsed_uri = urlparse(str(entity_uri)) 

205 entity_name = parsed_uri.path.split('/')[-1] 

206 

207 # Set the counter to 1 (for the first snapshot) 

208 self.counter_handler.set_counter(1, entity_name) 

209 self.logger.info(f"Reset provenance counter for entity {entity_uri} to 1") 

210 

211 def _remove_invalidated_time(self, snapshot: dict) -> bool: 

212 """ 

213 Remove the invalidatedAtTime property from a snapshot. 

214 

215 Args: 

216 snapshot: A dictionary containing snapshot information 

217 

218 Returns: 

219 bool: True if the operation was successful, False otherwise 

220 """ 

221 snapshot_uri = snapshot['uri'] 

222 

223 # Construct the graph name based on the snapshot URI 

224 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/" 

225 

226 # Delete the invalidatedAtTime property 

227 query = f""" 

228 PREFIX prov: <http://www.w3.org/ns/prov#> 

229  

230 DELETE {{ 

231 GRAPH <{graph_uri}> {{ 

232 <{snapshot_uri}> prov:invalidatedAtTime ?time . 

233 }} 

234 }} 

235 WHERE {{ 

236 GRAPH <{graph_uri}> {{ 

237 <{snapshot_uri}> prov:invalidatedAtTime ?time . 

238 }} 

239 }} 

240 """ 

241 

242 try: 

243 self.provenance_sparql.setQuery(query) 

244 self.provenance_sparql.method = "POST" 

245 self.provenance_sparql.query() 

246 self.logger.info(f"Successfully removed invalidatedAtTime from snapshot: {snapshot_uri}") 

247 return True 

248 except Exception as e: 

249 self.logger.error(f"Error removing invalidatedAtTime from snapshot {snapshot_uri}: {e}") 

250 return False 

251 

252 

253def reset_entity_provenance( 

254 entity_uri: str, 

255 provenance_endpoint: str, 

256 counter_handler: CounterHandler, 

257) -> bool: 

258 """ 

259 Reset the provenance of a specific entity by deleting all snapshots 

260 after snapshot 1, removing the invalidatedAtTime property from the first snapshot, 

261 and resetting the provenance counters. 

262 

263 Args: 

264 entity_uri: The URI of the entity to reset 

265 provenance_endpoint: The SPARQL endpoint for the provenance database 

266 counter_handler: An instance of a CounterHandler to manage provenance counters 

267 

268 Returns: 

269 bool: True if the operation was successful, False otherwise 

270 """ 

271 resetter = ProvenanceResetter( 

272 provenance_endpoint=provenance_endpoint, 

273 counter_handler=counter_handler, 

274 ) 

275 

276 return resetter.reset_entity_provenance(entity_uri) 

277 

278 

279def load_config(config_path): 

280 """ 

281 Load configuration from a Python file. 

282  

283 Args: 

284 config_path: Path to the configuration file 

285  

286 Returns: 

287 module: The loaded configuration module 

288 """ 

289 try: 

290 spec = importlib.util.spec_from_file_location("config", config_path) 

291 config = importlib.util.module_from_spec(spec) 

292 spec.loader.exec_module(config) 

293 return config 

294 except Exception as e: 

295 logging.error(f"Error loading configuration file: {e}") 

296 sys.exit(1) 

297 

298 

299def main(): 

300 """Main entry point for the script when run from the command line.""" 

301 parser = argparse.ArgumentParser(description="Reset the provenance of a specific entity") 

302 parser.add_argument("entity_uri", help="URI of the entity to reset") 

303 parser.add_argument("--config", "-c", required=True, help="Path to the configuration file") 

304 parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") 

305 

306 args = parser.parse_args() 

307 

308 # Setup logging 

309 log_level = logging.DEBUG if args.verbose else logging.INFO 

310 logging.basicConfig( 

311 level=log_level, 

312 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

313 ) 

314 

315 # Load configuration 

316 config = load_config(args.config) 

317 

318 # Check if Config class exists 

319 if not hasattr(config, "Config"): 

320 logging.error("Configuration file must define a Config class") 

321 return 1 

322 

323 # Get required configuration from Config class 

324 if not hasattr(config.Config, "PROVENANCE_DB_URL"): 

325 logging.error("Config class must define PROVENANCE_DB_URL") 

326 return 1 

327 

328 provenance_endpoint = config.Config.PROVENANCE_DB_URL 

329 

330 # Get counter handler from Config class 

331 if not hasattr(config.Config, "COUNTER_HANDLER"): 

332 logging.error("Config class must define COUNTER_HANDLER") 

333 return 1 

334 

335 counter_handler = config.Config.COUNTER_HANDLER 

336 

337 success = reset_entity_provenance( 

338 entity_uri=args.entity_uri, 

339 provenance_endpoint=provenance_endpoint, 

340 counter_handler=counter_handler 

341 ) 

342 

343 if success: 

344 logging.info(f"Successfully reset provenance for entity {args.entity_uri}") 

345 return 0 

346 else: 

347 logging.error(f"Failed to reset provenance for entity {args.entity_uri}") 

348 return 1 

349 

350 

351if __name__ == "__main__": # pragma: no cover 

352 sys.exit(main())