Coverage for heritrace / scripts / reset_provenance.py: 98%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import importlib.util 

7import logging 

8import sys 

9import types 

10from datetime import datetime, timezone 

11from urllib.parse import urlparse 

12 

13from rdflib import URIRef 

14from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

15from SPARQLWrapper import JSON 

16from SPARQLWrapper.SPARQLExceptions import SPARQLWrapperException 

17 

18from heritrace.sparql import SPARQLWrapperWithRetry, get_sparql_bindings 

19from heritrace.utils.converters import convert_to_datetime 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24class ProvenanceResetter: 

25 """ 

26 A class to reset the provenance of a specific entity by deleting all snapshots 

27 after snapshot 1 and resetting the provenance counters. 

28 """ 

29 

30 def __init__( 

31 self, 

32 provenance_endpoint: str, 

33 counter_handler: CounterHandler, 

34 ) -> None: 

35 """ 

36 Initialize the ProvenanceResetter. 

37 

38 Args: 

39 provenance_endpoint: The SPARQL endpoint for the provenance database 

40 counter_handler: An instance of a CounterHandler to manage provenance 

41 counters 

42 """ 

43 self.provenance_endpoint = provenance_endpoint 

44 self.provenance_sparql = SPARQLWrapperWithRetry(provenance_endpoint) 

45 self.provenance_sparql.setReturnFormat(JSON) 

46 self.counter_handler = counter_handler 

47 self.logger = logging.getLogger(__name__) 

48 

49 def reset_entity_provenance(self, entity_uri: URIRef) -> bool: 

50 

51 # Step 1: Find all snapshots for the entity 

52 snapshots = self.get_entity_snapshots(entity_uri) 

53 if not snapshots: 

54 self.logger.warning("No snapshots found for entity %s", entity_uri) 

55 return False 

56 

57 # Sort snapshots by generation time, converting strings to datetime objects 

58 sorted_snapshots = sorted( 

59 snapshots, 

60 key=lambda x: ( 

61 convert_to_datetime(x["generation_time"]) 

62 or datetime.min.replace(tzinfo=timezone.utc) 

63 ), 

64 ) 

65 

66 # Keep only the first snapshot 

67 first_snapshot = sorted_snapshots[0] 

68 snapshots_to_delete = sorted_snapshots[1:] 

69 

70 if not snapshots_to_delete: 

71 self.logger.info( 

72 "Entity %s has only one snapshot, nothing to reset", entity_uri 

73 ) 

74 # Still remove invalidatedAtTime from the first snapshot 

75 self.remove_invalidated_time(first_snapshot) 

76 return True 

77 

78 # Step 2: Delete all snapshots after the first one 

79 success = self.delete_snapshots(snapshots_to_delete) 

80 if not success: 

81 return False 

82 

83 # Step 3: Reset the provenance counter for this entity 

84 self.reset_provenance_counter(entity_uri) 

85 

86 # Step 4: Remove invalidatedAtTime from the first snapshot 

87 self.remove_invalidated_time(first_snapshot) 

88 

89 self.logger.info("Successfully reset provenance for entity %s", entity_uri) 

90 return True 

91 

92 def get_entity_snapshots(self, entity_uri: URIRef) -> list: 

93 """ 

94 Get all snapshots for a specific entity. 

95 

96 Args: 

97 entity_uri: The URI of the entity 

98 

99 Returns: 

100 list: A list of dictionaries containing snapshot information 

101 """ 

102 query = f""" 

103 PREFIX prov: <http://www.w3.org/ns/prov#> 

104 

105 SELECT ?snapshot ?generation_time 

106 WHERE {{ 

107 GRAPH ?g {{ 

108 ?snapshot prov:specializationOf <{entity_uri}> ; 

109 prov:generatedAtTime ?generation_time . 

110 }} 

111 }} 

112 ORDER BY ?generation_time 

113 """ 

114 

115 self.provenance_sparql.setQuery(query) 

116 bindings = get_sparql_bindings(self.provenance_sparql.queryAndConvert()) 

117 

118 return [ 

119 { 

120 "uri": binding["snapshot"]["value"], 

121 "generation_time": binding["generation_time"]["value"], 

122 } 

123 for binding in bindings 

124 ] 

125 

126 def delete_snapshots(self, snapshots: list) -> bool: 

127 """ 

128 Delete a list of snapshots from the provenance database. 

129 

130 Args: 

131 snapshots: A list of snapshot dictionaries to delete 

132 

133 Returns: 

134 bool: True if the operation was successful, False otherwise 

135 """ 

136 if not snapshots: 

137 return True 

138 

139 # Virtuoso has limitations with DELETE WHERE queries 

140 # We need to delete each snapshot individually 

141 success = True 

142 for snapshot in snapshots: 

143 snapshot_uri = snapshot["uri"] 

144 

145 # Construct the graph name based on the snapshot URI 

146 # The graph name follows the pattern: snapshot_uri/prov/ 

147 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/" 

148 

149 # Delete all triples where the snapshot is the subject 

150 query = f""" 

151 PREFIX prov: <http://www.w3.org/ns/prov#> 

152 

153 DELETE {{ 

154 GRAPH <{graph_uri}> {{ 

155 <{snapshot_uri}> ?p ?o . 

156 }} 

157 }} 

158 WHERE {{ 

159 GRAPH <{graph_uri}> {{ 

160 <{snapshot_uri}> ?p ?o . 

161 }} 

162 }} 

163 """ 

164 

165 try: 

166 self.provenance_sparql.setQuery(query) 

167 self.provenance_sparql.method = "POST" 

168 self.provenance_sparql.query() 

169 

170 # Also delete triples where the snapshot is the object 

171 query = f""" 

172 PREFIX prov: <http://www.w3.org/ns/prov#> 

173 

174 DELETE {{ 

175 GRAPH <{graph_uri}> {{ 

176 ?s ?p <{snapshot_uri}> . 

177 }} 

178 }} 

179 WHERE {{ 

180 GRAPH <{graph_uri}> {{ 

181 ?s ?p <{snapshot_uri}> . 

182 }} 

183 }} 

184 """ 

185 

186 self.provenance_sparql.setQuery(query) 

187 self.provenance_sparql.query() 

188 

189 self.logger.debug( 

190 "Successfully deleted snapshot: %s from graph: %s", 

191 snapshot_uri, 

192 graph_uri, 

193 ) 

194 except SPARQLWrapperException: 

195 self.logger.exception("Error deleting snapshot %s", snapshot_uri) 

196 success = False 

197 

198 return success 

199 

200 def reset_provenance_counter(self, entity_uri: URIRef) -> None: 

201 """ 

202 Reset the provenance counter for a specific entity to 1. 

203 

204 Args: 

205 entity_uri: The URI of the entity 

206 """ 

207 # Extract the entity name from the URI 

208 parsed_uri = urlparse(str(entity_uri)) 

209 entity_name = parsed_uri.path.split("/")[-1] 

210 

211 # Set the counter to 1 (for the first snapshot) 

212 self.counter_handler.set_counter(1, entity_name) 

213 self.logger.info("Reset provenance counter for entity %s to 1", entity_uri) 

214 

215 def remove_invalidated_time(self, snapshot: dict) -> bool: 

216 """ 

217 Remove the invalidatedAtTime property from a snapshot. 

218 

219 Args: 

220 snapshot: A dictionary containing snapshot information 

221 

222 Returns: 

223 bool: True if the operation was successful, False otherwise 

224 """ 

225 snapshot_uri = snapshot["uri"] 

226 

227 # Construct the graph name based on the snapshot URI 

228 graph_uri = f"{snapshot_uri.split('/prov/se/')[0]}/prov/" 

229 

230 # Delete the invalidatedAtTime property 

231 query = f""" 

232 PREFIX prov: <http://www.w3.org/ns/prov#> 

233 

234 DELETE {{ 

235 GRAPH <{graph_uri}> {{ 

236 <{snapshot_uri}> prov:invalidatedAtTime ?time . 

237 }} 

238 }} 

239 WHERE {{ 

240 GRAPH <{graph_uri}> {{ 

241 <{snapshot_uri}> prov:invalidatedAtTime ?time . 

242 }} 

243 }} 

244 """ 

245 

246 try: 

247 self.provenance_sparql.setQuery(query) 

248 self.provenance_sparql.method = "POST" 

249 self.provenance_sparql.query() 

250 self.logger.info( 

251 "Successfully removed invalidatedAtTime from snapshot: %s", 

252 snapshot_uri, 

253 ) 

254 except SPARQLWrapperException: 

255 self.logger.exception( 

256 "Error removing invalidatedAtTime from snapshot %s", snapshot_uri 

257 ) 

258 return False 

259 else: 

260 return True 

261 

262 

263def reset_entity_provenance( 

264 entity_uri: URIRef, 

265 provenance_endpoint: str, 

266 counter_handler: CounterHandler, 

267) -> bool: 

268 resetter = ProvenanceResetter( 

269 provenance_endpoint=provenance_endpoint, 

270 counter_handler=counter_handler, 

271 ) 

272 

273 return resetter.reset_entity_provenance(entity_uri) 

274 

275 

276def load_config(config_path: str) -> types.ModuleType: 

277 """ 

278 Load configuration from a Python file. 

279 

280 Args: 

281 config_path: Path to the configuration file 

282 

283 Returns: 

284 module: The loaded configuration module 

285 """ 

286 try: 

287 spec = importlib.util.spec_from_file_location("config", config_path) 

288 if spec is None or spec.loader is None: 

289 logger.error("Failed to create module spec from %s", config_path) 

290 sys.exit(1) 

291 config = importlib.util.module_from_spec(spec) 

292 spec.loader.exec_module(config) 

293 except SystemExit: 

294 raise 

295 except (FileNotFoundError, ImportError, AttributeError): 

296 logger.exception("Error loading configuration file: %s", config_path) 

297 sys.exit(1) 

298 else: 

299 return config 

300 

301 

302def main() -> int: 

303 parser = argparse.ArgumentParser( 

304 description="Reset the provenance of a specific entity" 

305 ) 

306 parser.add_argument("entity_uri", help="URI of the entity to reset") 

307 parser.add_argument( 

308 "--config", "-c", required=True, help="Path to the configuration file" 

309 ) 

310 parser.add_argument( 

311 "--verbose", "-v", action="store_true", help="Enable verbose logging" 

312 ) 

313 

314 args = parser.parse_args() 

315 

316 # Setup logging 

317 log_level = logging.DEBUG if args.verbose else logging.INFO 

318 logging.basicConfig( 

319 level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 

320 ) 

321 

322 # Load configuration 

323 config = load_config(args.config) 

324 

325 # Check if Config class exists 

326 if not hasattr(config, "Config"): 

327 logger.error("Configuration file must define a Config class") 

328 return 1 

329 

330 # Get required configuration from Config class 

331 if not hasattr(config.Config, "PROVENANCE_DB_URL"): 

332 logger.error("Config class must define PROVENANCE_DB_URL") 

333 return 1 

334 

335 provenance_endpoint = config.Config.PROVENANCE_DB_URL 

336 

337 # Get counter handler from Config class 

338 if not hasattr(config.Config, "COUNTER_HANDLER"): 

339 logger.error("Config class must define COUNTER_HANDLER") 

340 return 1 

341 

342 counter_handler = config.Config.COUNTER_HANDLER 

343 

344 success = reset_entity_provenance( 

345 entity_uri=URIRef(args.entity_uri), 

346 provenance_endpoint=provenance_endpoint, 

347 counter_handler=counter_handler, 

348 ) 

349 

350 if success: 

351 logger.info("Successfully reset provenance for entity %s", args.entity_uri) 

352 return 0 

353 logger.error("Failed to reset provenance for entity %s", args.entity_uri) 

354 return 1 

355 

356 

357if __name__ == "__main__": # pragma: no cover 

358 sys.exit(main())