Coverage for heritrace / scripts / reset_provenance.py: 100%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-21 12:56 +0000

1#!/usr/bin/env python3 

2 

3# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7import argparse 

8import importlib.util 

9import logging 

10import sys 

11from typing import Union 

12from urllib.parse import urlparse 

13 

14from heritrace.extensions import SPARQLWrapperWithRetry 

15from heritrace.utils.converters import convert_to_datetime 

16from rdflib import URIRef 

17from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

18from SPARQLWrapper import JSON 

19 

20 

21class ProvenanceResetter: 

22 """ 

23 A class to reset the provenance of a specific entity by deleting all snapshots 

24 after snapshot 1 and resetting the provenance counters. 

25 """ 

26 

27 def __init__( 

28 self, 

29 provenance_endpoint: str, 

30 counter_handler: CounterHandler, 

31 ): 

32 """ 

33 Initialize the ProvenanceResetter. 

34 

35 Args: 

36 provenance_endpoint: The SPARQL endpoint for the provenance database 

37 counter_handler: An instance of a CounterHandler to manage provenance counters 

38 """ 

39 self.provenance_endpoint = provenance_endpoint 

40 self.provenance_sparql = SPARQLWrapperWithRetry(provenance_endpoint) 

41 self.provenance_sparql.setReturnFormat(JSON) 

42 self.counter_handler = counter_handler 

43 self.logger = logging.getLogger(__name__) 

44 

45 def reset_entity_provenance(self, entity_uri: Union[str, URIRef]) -> bool: 

46 """ 

47 Reset the provenance of a specific entity by deleting all snapshots 

48 after snapshot 1, removing the invalidatedAtTime property from the first snapshot, 

49 and resetting the provenance counters. 

50 

51 Args: 

52 entity_uri: The URI of the entity to reset 

53 

54 Returns: 

55 bool: True if the operation was successful, False otherwise 

56 """ 

57 if not isinstance(entity_uri, URIRef): 

58 entity_uri = URIRef(entity_uri) 

59 

60 # Step 1: Find all snapshots for the entity 

61 snapshots = self._get_entity_snapshots(entity_uri) 

62 if not snapshots: 

63 self.logger.warning(f"No snapshots found for entity {entity_uri}") 

64 return False 

65 

66 # Sort snapshots by generation time, converting strings to datetime objects 

67 sorted_snapshots = sorted( 

68 snapshots, key=lambda x: convert_to_datetime(x["generation_time"]) 

69 ) 

70 

71 # Keep only the first snapshot 

72 first_snapshot = sorted_snapshots[0] 

73 snapshots_to_delete = sorted_snapshots[1:] 

74 

75 if not snapshots_to_delete: 

76 self.logger.info(f"Entity {entity_uri} has only one snapshot, nothing to reset") 

77 # Still remove invalidatedAtTime from the first snapshot 

78 self._remove_invalidated_time(first_snapshot) 

79 return True 

80 

81 # Step 2: Delete all snapshots after the first one 

82 success = self._delete_snapshots(snapshots_to_delete) 

83 if not success: 

84 return False 

85 

86 # Step 3: Reset the provenance counter for this entity 

87 self._reset_provenance_counter(entity_uri) 

88 

89 # Step 4: Remove invalidatedAtTime from the first snapshot 

90 self._remove_invalidated_time(first_snapshot) 

91 

92 self.logger.info(f"Successfully reset provenance for entity {entity_uri}") 

93 return True 

94 

95 def _get_entity_snapshots(self, entity_uri: URIRef) -> list: 

96 """ 

97 Get all snapshots for a specific entity. 

98 

99 Args: 

100 entity_uri: The URI of the entity 

101 

102 Returns: 

103 list: A list of dictionaries containing snapshot information 

104 """ 

105 query = f""" 

106 PREFIX prov: <http://www.w3.org/ns/prov#> 

107  

108 SELECT ?snapshot ?generation_time 

109 WHERE {{ 

110 GRAPH ?g {{ 

111 ?snapshot prov:specializationOf <{entity_uri}> ; 

112 prov:generatedAtTime ?generation_time . 

113 }} 

114 }} 

115 ORDER BY ?generation_time 

116 """ 

117 

118 self.provenance_sparql.setQuery(query) 

119 results = self.provenance_sparql.queryAndConvert() 

120 

121 snapshots = [] 

122 for binding in results["results"]["bindings"]: 

123 snapshots.append({ 

124 "uri": binding["snapshot"]["value"], 

125 "generation_time": binding["generation_time"]["value"] 

126 }) 

127 

128 return snapshots 

129 

130 def _delete_snapshots(self, snapshots: list) -> bool: 

131 """ 

132 Delete a list of snapshots from the provenance database. 

133 

134 Args: 

135 snapshots: A list of snapshot dictionaries to delete 

136 

137 Returns: 

138 bool: True if the operation was successful, False otherwise 

139 """ 

140 if not snapshots: 

141 return True 

142 

143 # Virtuoso has limitations with DELETE WHERE queries 

144 # We need to delete each snapshot individually 

145 success = True 

146 for snapshot in snapshots: 

147 snapshot_uri = snapshot['uri'] 

148 

149 # Construct the graph name based on the snapshot URI 

150 # The graph name follows the pattern: snapshot_uri/prov/ 

151 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/" 

152 

153 # Delete all triples where the snapshot is the subject 

154 query = f""" 

155 PREFIX prov: <http://www.w3.org/ns/prov#> 

156  

157 DELETE {{ 

158 GRAPH <{graph_uri}> {{ 

159 <{snapshot_uri}> ?p ?o . 

160 }} 

161 }} 

162 WHERE {{ 

163 GRAPH <{graph_uri}> {{ 

164 <{snapshot_uri}> ?p ?o . 

165 }} 

166 }} 

167 """ 

168 

169 try: 

170 self.provenance_sparql.setQuery(query) 

171 self.provenance_sparql.method = "POST" 

172 self.provenance_sparql.query() 

173 

174 # Also delete triples where the snapshot is the object 

175 query = f""" 

176 PREFIX prov: <http://www.w3.org/ns/prov#> 

177  

178 DELETE {{ 

179 GRAPH <{graph_uri}> {{ 

180 ?s ?p <{snapshot_uri}> . 

181 }} 

182 }} 

183 WHERE {{ 

184 GRAPH <{graph_uri}> {{ 

185 ?s ?p <{snapshot_uri}> . 

186 }} 

187 }} 

188 """ 

189 

190 self.provenance_sparql.setQuery(query) 

191 self.provenance_sparql.query() 

192 

193 self.logger.debug(f"Successfully deleted snapshot: {snapshot_uri} from graph: {graph_uri}") 

194 except Exception as e: 

195 self.logger.error(f"Error deleting snapshot {snapshot_uri}: {e}") 

196 success = False 

197 

198 return success 

199 

200 def _reset_provenance_counter(self, entity_uri: URIRef) -> None: 

201 """ 

202 Reset the provenance counter for a specific entity to 1. 

203 

204 Args: 

205 entity_uri: The URI of the entity 

206 """ 

207 # Extract the entity name from the URI 

208 parsed_uri = urlparse(str(entity_uri)) 

209 entity_name = parsed_uri.path.split('/')[-1] 

210 

211 # Set the counter to 1 (for the first snapshot) 

212 self.counter_handler.set_counter(1, entity_name) 

213 self.logger.info(f"Reset provenance counter for entity {entity_uri} to 1") 

214 

215 def _remove_invalidated_time(self, snapshot: dict) -> bool: 

216 """ 

217 Remove the invalidatedAtTime property from a snapshot. 

218 

219 Args: 

220 snapshot: A dictionary containing snapshot information 

221 

222 Returns: 

223 bool: True if the operation was successful, False otherwise 

224 """ 

225 snapshot_uri = snapshot['uri'] 

226 

227 # Construct the graph name based on the snapshot URI 

228 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/" 

229 

230 # Delete the invalidatedAtTime property 

231 query = f""" 

232 PREFIX prov: <http://www.w3.org/ns/prov#> 

233  

234 DELETE {{ 

235 GRAPH <{graph_uri}> {{ 

236 <{snapshot_uri}> prov:invalidatedAtTime ?time . 

237 }} 

238 }} 

239 WHERE {{ 

240 GRAPH <{graph_uri}> {{ 

241 <{snapshot_uri}> prov:invalidatedAtTime ?time . 

242 }} 

243 }} 

244 """ 

245 

246 try: 

247 self.provenance_sparql.setQuery(query) 

248 self.provenance_sparql.method = "POST" 

249 self.provenance_sparql.query() 

250 self.logger.info(f"Successfully removed invalidatedAtTime from snapshot: {snapshot_uri}") 

251 return True 

252 except Exception as e: 

253 self.logger.error(f"Error removing invalidatedAtTime from snapshot {snapshot_uri}: {e}") 

254 return False 

255 

256 

257def reset_entity_provenance( 

258 entity_uri: str, 

259 provenance_endpoint: str, 

260 counter_handler: CounterHandler, 

261) -> bool: 

262 """ 

263 Reset the provenance of a specific entity by deleting all snapshots 

264 after snapshot 1, removing the invalidatedAtTime property from the first snapshot, 

265 and resetting the provenance counters. 

266 

267 Args: 

268 entity_uri: The URI of the entity to reset 

269 provenance_endpoint: The SPARQL endpoint for the provenance database 

270 counter_handler: An instance of a CounterHandler to manage provenance counters 

271 

272 Returns: 

273 bool: True if the operation was successful, False otherwise 

274 """ 

275 resetter = ProvenanceResetter( 

276 provenance_endpoint=provenance_endpoint, 

277 counter_handler=counter_handler, 

278 ) 

279 

280 return resetter.reset_entity_provenance(entity_uri) 

281 

282 

283def load_config(config_path): 

284 """ 

285 Load configuration from a Python file. 

286  

287 Args: 

288 config_path: Path to the configuration file 

289  

290 Returns: 

291 module: The loaded configuration module 

292 """ 

293 try: 

294 spec = importlib.util.spec_from_file_location("config", config_path) 

295 config = importlib.util.module_from_spec(spec) 

296 spec.loader.exec_module(config) 

297 return config 

298 except Exception as e: 

299 logging.error(f"Error loading configuration file: {e}") 

300 sys.exit(1) 

301 

302 

303def main(): 

304 """Main entry point for the script when run from the command line.""" 

305 parser = argparse.ArgumentParser(description="Reset the provenance of a specific entity") 

306 parser.add_argument("entity_uri", help="URI of the entity to reset") 

307 parser.add_argument("--config", "-c", required=True, help="Path to the configuration file") 

308 parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") 

309 

310 args = parser.parse_args() 

311 

312 # Setup logging 

313 log_level = logging.DEBUG if args.verbose else logging.INFO 

314 logging.basicConfig( 

315 level=log_level, 

316 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

317 ) 

318 

319 # Load configuration 

320 config = load_config(args.config) 

321 

322 # Check if Config class exists 

323 if not hasattr(config, "Config"): 

324 logging.error("Configuration file must define a Config class") 

325 return 1 

326 

327 # Get required configuration from Config class 

328 if not hasattr(config.Config, "PROVENANCE_DB_URL"): 

329 logging.error("Config class must define PROVENANCE_DB_URL") 

330 return 1 

331 

332 provenance_endpoint = config.Config.PROVENANCE_DB_URL 

333 

334 # Get counter handler from Config class 

335 if not hasattr(config.Config, "COUNTER_HANDLER"): 

336 logging.error("Config class must define COUNTER_HANDLER") 

337 return 1 

338 

339 counter_handler = config.Config.COUNTER_HANDLER 

340 

341 success = reset_entity_provenance( 

342 entity_uri=args.entity_uri, 

343 provenance_endpoint=provenance_endpoint, 

344 counter_handler=counter_handler 

345 ) 

346 

347 if success: 

348 logging.info(f"Successfully reset provenance for entity {args.entity_uri}") 

349 return 0 

350 else: 

351 logging.error(f"Failed to reset provenance for entity {args.entity_uri}") 

352 return 1 

353 

354 

355if __name__ == "__main__": # pragma: no cover 

356 sys.exit(main())