Coverage for heritrace/scripts/reset_provenance.py: 100%

122 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-18 11:10 +0000

1#!/usr/bin/env python3 

2 

3import argparse 

4import importlib.util 

5import logging 

6import sys 

7from typing import Union 

8from urllib.parse import urlparse 

9 

10from rdflib import URIRef 

11from SPARQLWrapper import JSON, SPARQLWrapper 

12from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

13from heritrace.utils.converters import convert_to_datetime 

14 

15 

16class ProvenanceResetter: 

17 """ 

18 A class to reset the provenance of a specific entity by deleting all snapshots 

19 after snapshot 1 and resetting the provenance counters. 

20 """ 

21 

22 def __init__( 

23 self, 

24 provenance_endpoint: str, 

25 counter_handler: CounterHandler, 

26 ): 

27 """ 

28 Initialize the ProvenanceResetter. 

29 

30 Args: 

31 provenance_endpoint: The SPARQL endpoint for the provenance database 

32 counter_handler: An instance of a CounterHandler to manage provenance counters 

33 """ 

34 self.provenance_endpoint = provenance_endpoint 

35 self.provenance_sparql = SPARQLWrapper(provenance_endpoint) 

36 self.provenance_sparql.setReturnFormat(JSON) 

37 self.counter_handler = counter_handler 

38 self.logger = logging.getLogger(__name__) 

39 

40 def reset_entity_provenance(self, entity_uri: Union[str, URIRef]) -> bool: 

41 """ 

42 Reset the provenance of a specific entity by deleting all snapshots 

43 after snapshot 1, removing the invalidatedAtTime property from the first snapshot, 

44 and resetting the provenance counters. 

45 

46 Args: 

47 entity_uri: The URI of the entity to reset 

48 

49 Returns: 

50 bool: True if the operation was successful, False otherwise 

51 """ 

52 if not isinstance(entity_uri, URIRef): 

53 entity_uri = URIRef(entity_uri) 

54 

55 # Step 1: Find all snapshots for the entity 

56 snapshots = self._get_entity_snapshots(entity_uri) 

57 if not snapshots: 

58 self.logger.warning(f"No snapshots found for entity {entity_uri}") 

59 return False 

60 

61 # Sort snapshots by generation time, converting strings to datetime objects 

62 sorted_snapshots = sorted( 

63 snapshots, key=lambda x: convert_to_datetime(x["generation_time"]) 

64 ) 

65 

66 # Keep only the first snapshot 

67 first_snapshot = sorted_snapshots[0] 

68 snapshots_to_delete = sorted_snapshots[1:] 

69 

70 if not snapshots_to_delete: 

71 self.logger.info(f"Entity {entity_uri} has only one snapshot, nothing to reset") 

72 # Still remove invalidatedAtTime from the first snapshot 

73 self._remove_invalidated_time(first_snapshot) 

74 return True 

75 

76 # Step 2: Delete all snapshots after the first one 

77 success = self._delete_snapshots(snapshots_to_delete) 

78 if not success: 

79 return False 

80 

81 # Step 3: Reset the provenance counter for this entity 

82 self._reset_provenance_counter(entity_uri) 

83 

84 # Step 4: Remove invalidatedAtTime from the first snapshot 

85 self._remove_invalidated_time(first_snapshot) 

86 

87 self.logger.info(f"Successfully reset provenance for entity {entity_uri}") 

88 return True 

89 

90 def _get_entity_snapshots(self, entity_uri: URIRef) -> list: 

91 """ 

92 Get all snapshots for a specific entity. 

93 

94 Args: 

95 entity_uri: The URI of the entity 

96 

97 Returns: 

98 list: A list of dictionaries containing snapshot information 

99 """ 

100 query = f""" 

101 PREFIX prov: <http://www.w3.org/ns/prov#> 

102  

103 SELECT ?snapshot ?generation_time 

104 WHERE {{ 

105 GRAPH ?g {{ 

106 ?snapshot prov:specializationOf <{entity_uri}> ; 

107 prov:generatedAtTime ?generation_time . 

108 }} 

109 }} 

110 ORDER BY ?generation_time 

111 """ 

112 

113 self.provenance_sparql.setQuery(query) 

114 results = self.provenance_sparql.queryAndConvert() 

115 

116 snapshots = [] 

117 for binding in results["results"]["bindings"]: 

118 snapshots.append({ 

119 "uri": binding["snapshot"]["value"], 

120 "generation_time": binding["generation_time"]["value"] 

121 }) 

122 

123 return snapshots 

124 

125 def _delete_snapshots(self, snapshots: list) -> bool: 

126 """ 

127 Delete a list of snapshots from the provenance database. 

128 

129 Args: 

130 snapshots: A list of snapshot dictionaries to delete 

131 

132 Returns: 

133 bool: True if the operation was successful, False otherwise 

134 """ 

135 if not snapshots: 

136 return True 

137 

138 # Virtuoso has limitations with DELETE WHERE queries 

139 # We need to delete each snapshot individually 

140 success = True 

141 for snapshot in snapshots: 

142 snapshot_uri = snapshot['uri'] 

143 

144 # Construct the graph name based on the snapshot URI 

145 # The graph name follows the pattern: snapshot_uri/prov/ 

146 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/" 

147 

148 # Delete all triples where the snapshot is the subject 

149 query = f""" 

150 PREFIX prov: <http://www.w3.org/ns/prov#> 

151  

152 DELETE {{ 

153 GRAPH <{graph_uri}> {{ 

154 <{snapshot_uri}> ?p ?o . 

155 }} 

156 }} 

157 WHERE {{ 

158 GRAPH <{graph_uri}> {{ 

159 <{snapshot_uri}> ?p ?o . 

160 }} 

161 }} 

162 """ 

163 

164 try: 

165 self.provenance_sparql.setQuery(query) 

166 self.provenance_sparql.method = "POST" 

167 self.provenance_sparql.query() 

168 

169 # Also delete triples where the snapshot is the object 

170 query = f""" 

171 PREFIX prov: <http://www.w3.org/ns/prov#> 

172  

173 DELETE {{ 

174 GRAPH <{graph_uri}> {{ 

175 ?s ?p <{snapshot_uri}> . 

176 }} 

177 }} 

178 WHERE {{ 

179 GRAPH <{graph_uri}> {{ 

180 ?s ?p <{snapshot_uri}> . 

181 }} 

182 }} 

183 """ 

184 

185 self.provenance_sparql.setQuery(query) 

186 self.provenance_sparql.query() 

187 

188 self.logger.debug(f"Successfully deleted snapshot: {snapshot_uri} from graph: {graph_uri}") 

189 except Exception as e: 

190 self.logger.error(f"Error deleting snapshot {snapshot_uri}: {e}") 

191 success = False 

192 

193 return success 

194 

195 def _reset_provenance_counter(self, entity_uri: URIRef) -> None: 

196 """ 

197 Reset the provenance counter for a specific entity to 1. 

198 

199 Args: 

200 entity_uri: The URI of the entity 

201 """ 

202 # Extract the entity name from the URI 

203 parsed_uri = urlparse(str(entity_uri)) 

204 entity_name = parsed_uri.path.split('/')[-1] 

205 

206 # Set the counter to 1 (for the first snapshot) 

207 self.counter_handler.set_counter(1, entity_name) 

208 self.logger.info(f"Reset provenance counter for entity {entity_uri} to 1") 

209 

210 def _remove_invalidated_time(self, snapshot: dict) -> bool: 

211 """ 

212 Remove the invalidatedAtTime property from a snapshot. 

213 

214 Args: 

215 snapshot: A dictionary containing snapshot information 

216 

217 Returns: 

218 bool: True if the operation was successful, False otherwise 

219 """ 

220 snapshot_uri = snapshot['uri'] 

221 

222 # Construct the graph name based on the snapshot URI 

223 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/" 

224 

225 # Delete the invalidatedAtTime property 

226 query = f""" 

227 PREFIX prov: <http://www.w3.org/ns/prov#> 

228  

229 DELETE {{ 

230 GRAPH <{graph_uri}> {{ 

231 <{snapshot_uri}> prov:invalidatedAtTime ?time . 

232 }} 

233 }} 

234 WHERE {{ 

235 GRAPH <{graph_uri}> {{ 

236 <{snapshot_uri}> prov:invalidatedAtTime ?time . 

237 }} 

238 }} 

239 """ 

240 

241 try: 

242 self.provenance_sparql.setQuery(query) 

243 self.provenance_sparql.method = "POST" 

244 self.provenance_sparql.query() 

245 self.logger.info(f"Successfully removed invalidatedAtTime from snapshot: {snapshot_uri}") 

246 return True 

247 except Exception as e: 

248 self.logger.error(f"Error removing invalidatedAtTime from snapshot {snapshot_uri}: {e}") 

249 return False 

250 

251 

252def reset_entity_provenance( 

253 entity_uri: str, 

254 provenance_endpoint: str, 

255 counter_handler: CounterHandler, 

256) -> bool: 

257 """ 

258 Reset the provenance of a specific entity by deleting all snapshots 

259 after snapshot 1, removing the invalidatedAtTime property from the first snapshot, 

260 and resetting the provenance counters. 

261 

262 Args: 

263 entity_uri: The URI of the entity to reset 

264 provenance_endpoint: The SPARQL endpoint for the provenance database 

265 counter_handler: An instance of a CounterHandler to manage provenance counters 

266 

267 Returns: 

268 bool: True if the operation was successful, False otherwise 

269 """ 

270 resetter = ProvenanceResetter( 

271 provenance_endpoint=provenance_endpoint, 

272 counter_handler=counter_handler, 

273 ) 

274 

275 return resetter.reset_entity_provenance(entity_uri) 

276 

277 

278def load_config(config_path): 

279 """ 

280 Load configuration from a Python file. 

281  

282 Args: 

283 config_path: Path to the configuration file 

284  

285 Returns: 

286 module: The loaded configuration module 

287 """ 

288 try: 

289 spec = importlib.util.spec_from_file_location("config", config_path) 

290 config = importlib.util.module_from_spec(spec) 

291 spec.loader.exec_module(config) 

292 return config 

293 except Exception as e: 

294 logging.error(f"Error loading configuration file: {e}") 

295 sys.exit(1) 

296 

297 

298def main(): 

299 """Main entry point for the script when run from the command line.""" 

300 parser = argparse.ArgumentParser(description="Reset the provenance of a specific entity") 

301 parser.add_argument("entity_uri", help="URI of the entity to reset") 

302 parser.add_argument("--config", "-c", required=True, help="Path to the configuration file") 

303 parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") 

304 

305 args = parser.parse_args() 

306 

307 # Setup logging 

308 log_level = logging.DEBUG if args.verbose else logging.INFO 

309 logging.basicConfig( 

310 level=log_level, 

311 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

312 ) 

313 

314 # Load configuration 

315 config = load_config(args.config) 

316 

317 # Check if Config class exists 

318 if not hasattr(config, "Config"): 

319 logging.error("Configuration file must define a Config class") 

320 return 1 

321 

322 # Get required configuration from Config class 

323 if not hasattr(config.Config, "PROVENANCE_DB_URL"): 

324 logging.error("Config class must define PROVENANCE_DB_URL") 

325 return 1 

326 

327 provenance_endpoint = config.Config.PROVENANCE_DB_URL 

328 

329 # Get counter handler from Config class 

330 if not hasattr(config.Config, "COUNTER_HANDLER"): 

331 logging.error("Config class must define COUNTER_HANDLER") 

332 return 1 

333 

334 counter_handler = config.Config.COUNTER_HANDLER 

335 

336 success = reset_entity_provenance( 

337 entity_uri=args.entity_uri, 

338 provenance_endpoint=provenance_endpoint, 

339 counter_handler=counter_handler 

340 ) 

341 

342 if success: 

343 logging.info(f"Successfully reset provenance for entity {args.entity_uri}") 

344 return 0 

345 else: 

346 logging.error(f"Failed to reset provenance for entity {args.entity_uri}") 

347 return 1 

348 

349 

350if __name__ == "__main__": # pragma: no cover 

351 sys.exit(main())