Coverage for meta_prov_fixer / update_db_from_issues.py: 0%

273 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:12 +0000

1#!/usr/bin/env python3 

2""" 

3Script to apply SPARQL updates to database for already detected issues. 

4 

5This script reads issues from JSON-Lines files generated by dry-run mode 

6and executes corresponding SPARQL UPDATE queries to database. 

7""" 

8 

9import os 

10import json 

11import time 

12import argparse 

13import logging 

14from pathlib import Path 

15from datetime import datetime, date 

16from typing import List, Tuple 

17from rdflib import URIRef, Literal 

18from rdflib.namespace import XSD 

19import traceback 

20 

21from sparqlite import SPARQLClient, QueryError, EndpointError 

22from meta_prov_fixer.src import ( 

23 FillerFixerFile, 

24 DateTimeFixerFile, 

25 MissingPrimSourceFixerFile, 

26 MultiPAFixerFile, 

27 MultiObjectFixerFile, 

28 sparql_update 

29) 

30from meta_prov_fixer.utils import batched, normalise_datetime, get_previous_meta_dump_uri 

31from meta_prov_fixer.virtuoso_watchdog import start_watchdog_thread 

32 

33 

34class SparqlUpdatesCheckpoint: 

35 """ 

36 Checkpoint class for tracking progress in update_db_from_issues.py. 

37  

38 Tracks which JSON-Lines files have been processed and how many issues 

39 of each type have been applied to database. 

40 """ 

41 

42 def __init__(self, path: str): 

43 self.path = path 

44 self.state = None 

45 self.dirty = False 

46 self.load() 

47 

48 def load(self): 

49 """Load checkpoint state from file if it exists.""" 

50 if os.path.exists(self.path): 

51 with open(self.path, "r", encoding="utf-8") as f: 

52 self.state = json.load(f) 

53 else: 

54 self.state = None 

55 

56 def _atomic_write(self, data: dict, retries: int = 5, delay: float = 0.05): 

57 """Atomically write data to checkpoint file.""" 

58 tmp_path = self.path + ".tmp" 

59 with open(tmp_path, "w", encoding="utf-8") as f: 

60 json.dump(data, f, indent=2) 

61 for i in range(retries): 

62 try: 

63 os.replace(tmp_path, self.path) 

64 return 

65 except PermissionError: 

66 time.sleep(delay) 

67 raise PermissionError(f"Failed to replace checkpoint file after {retries} retries") 

68 

69 def update_state( 

70 self, 

71 current_file: str, 

72 line_number: int, 

73 ff_applied: int, 

74 dt_applied: int, 

75 mps_applied: int, 

76 pa_applied: int, 

77 mo_applied: int 

78 ): 

79 """Update checkpoint state with current progress.""" 

80 self.state = { 

81 "current_file": current_file, 

82 "line_number": line_number, 

83 "ff_applied": ff_applied, 

84 "dt_applied": dt_applied, 

85 "mps_applied": mps_applied, 

86 "pa_applied": pa_applied, 

87 "mo_applied": mo_applied, 

88 "timestamp": datetime.now().isoformat() 

89 } 

90 self.dirty = True 

91 

92 def flush(self): 

93 """Flush checkpoint state to disk.""" 

94 if self.dirty and self.state: 

95 self._atomic_write(self.state) 

96 self.dirty = False 

97 

98 def get_resume_line(self, filename: str) -> int: 

99 """ 

100 Get line number to resume from for a given file. 

101  

102 Returns 0 if no checkpoint or different file. 

103 Returns checkpointed line number if resuming same file. 

104 """ 

105 if not self.state or self.state["current_file"] != filename: 

106 return 0 

107 return self.state["line_number"] 

108 

109 def get_applied_counts(self) -> Tuple[int, int, int, int, int]: 

110 """Get counts of applied issues from checkpoint.""" 

111 if not self.state: 

112 return (0, 0, 0, 0, 0) 

113 return ( 

114 self.state["ff_applied"], 

115 self.state["dt_applied"], 

116 self.state["mps_applied"], 

117 self.state["pa_applied"], 

118 self.state["mo_applied"] 

119 ) 

120 

121 

122def stream_and_fix_on_db( 

123 issues_dir: str, 

124 endpoint: str, 

125 meta_dumps: List[Tuple[str, str]], 

126 failed_log_fp: str, 

127 chunk_size: int = 100, 

128 checkpoint: SparqlUpdatesCheckpoint = SparqlUpdatesCheckpoint("sparql_update_db.checkpoint.json"), 

129 resume: bool = True 

130): 

131 """ 

132 Stream JSON-Lines files and apply SPARQL updates in batches to the database. 

133  

134 Args: 

135 issues_dir: Directory containing JSON-Lines files storing detected issues 

136 endpoint: SPARQL endpoint URL for executing queries 

137 meta_dumps: Meta dumps register 

138 failed_log_fp: Path to failed queries log 

139 chunk_size: Number of issues per SPARQL update batch 

140 checkpoint: Checkpoint object for resuming 

141 resume: If True, use checkpoint to skip already-processed data 

142 """ 

143 client = SPARQLClient(endpoint) 

144 issues_dir:Path = Path(issues_dir) 

145 jsonl_files = sorted(issues_dir.glob('*.jsonl')) 

146 logging.info(f"Found {len(jsonl_files)} JSON-Lines file(s)") 

147 logging.info(f"Checkpoint state at start: {checkpoint.state}") 

148 

149 # Counters for statistics 

150 total_files = 0 

151 total_ff_issues = 0 

152 total_dt_issues = 0 

153 total_mps_issues = 0 

154 total_pa_issues = 0 

155 total_mo_issues = 0 

156 

157 # Load applied counts from checkpoint if resuming 

158 if resume and checkpoint.state: 

159 total_ff_issues, total_dt_issues, total_mps_issues, total_pa_issues, total_mo_issues = checkpoint.get_applied_counts() 

160 logging.info("Resuming from checkpoint:") 

161 logging.info(f" Filler issues already applied: {total_ff_issues}") 

162 logging.info(f" DateTime issues already applied: {total_dt_issues}") 

163 logging.info(f" Missing PS issues already applied: {total_mps_issues}") 

164 logging.info(f" Multi PA issues already applied: {total_pa_issues}") 

165 logging.info(f" Multi Object issues already applied: {total_mo_issues}") 

166 

167 # Batch accumulators for each issue type 

168 ff_batch = [] 

169 dt_batch = [] 

170 mps_batch = [] 

171 pa_batch = [] 

172 mo_batch = [] 

173 

174 def flush_batches(): 

175 """Flush all issue batches by applying the approriate fixes to the database.""" 

176 nonlocal total_ff_issues, total_dt_issues, total_mps_issues, total_pa_issues, total_mo_issues 

177 

178 if ff_batch: 

179 apply_filler_issues(client, ff_batch, failed_log_fp, chunk_size) 

180 total_ff_issues += len(ff_batch) 

181 ff_batch.clear() 

182 

183 if dt_batch: 

184 apply_datetime_issues(client, dt_batch, failed_log_fp, chunk_size) 

185 total_dt_issues += len(dt_batch) 

186 dt_batch.clear() 

187 

188 if mps_batch: 

189 apply_missing_ps_issues(client, mps_batch, meta_dumps, failed_log_fp, chunk_size) 

190 total_mps_issues += len(mps_batch) 

191 mps_batch.clear() 

192 

193 if pa_batch: 

194 apply_multi_pa_issues(client, pa_batch, failed_log_fp, chunk_size) 

195 total_pa_issues += len(pa_batch) 

196 pa_batch.clear() 

197 

198 if mo_batch: 

199 apply_multi_object_issues(client, mo_batch, meta_dumps, failed_log_fp, chunk_size) 

200 total_mo_issues += len(mo_batch) 

201 mo_batch.clear() 

202 

203 times_per_file = [] 

204 

205 logging.info(f"Processing JSON-Lines files in {issues_dir}...") 

206 

207 # Process each file line by line 

208 for file_idx, jsonl_file in enumerate(jsonl_files): 

209 # Checkpoint-based file skipping 

210 resume_line = 0 

211 if resume and checkpoint.state: 

212 checkpointed_file = checkpoint.state["current_file"] 

213 

214 # Skip files that come before checkpointed file in sorted order 

215 if jsonl_file.name < checkpointed_file: 

216 # logging.debug(f"Skipping already completed: {jsonl_file.name}") 

217 continue 

218 # Resume from specific line in checkpointed file 

219 elif jsonl_file.name == checkpointed_file: 

220 resume_line = checkpoint.state["line_number"] 

221 logging.info(f"Resuming from file '{jsonl_file.name}', line {resume_line}") 

222 # Process all files after checkpointed file normally 

223 else: 

224 logging.debug(f"Processing: {jsonl_file.name}") 

225 else: 

226 logging.debug(f"Processing: {jsonl_file.name}") 

227 

228 start_time = time.time() 

229 

230 with open(jsonl_file, 'r', encoding='utf-8') as f: 

231 for line_num, line in enumerate(f, 1): 

232 # Skip lines we've already processed 

233 if resume and line_num <= resume_line: 

234 continue 

235 

236 line = line.strip() 

237 if not line: 

238 continue 

239 

240 try: 

241 record = json.loads(line) 

242 # Count files only when actually processing (not when skipping lines) 

243 if line_num == 1 and resume_line == 0: 

244 total_files += 1 

245 

246 # Filler issues: each item is [graph_uri, {'to_delete': [...], 'remaining_snapshots': [...]}] 

247 ff_items = record.get('ff', []) 

248 for item in ff_items: 

249 if isinstance(item, list) and len(item) == 2: 

250 ff_batch.append((item[0], {'to_delete': item[1]['to_delete'], 'remaining_snapshots': item[1]['remaining_snapshots']})) 

251 else: 

252 ff_batch.append(item) 

253 

254 # DateTime issues: each item is [graph_uri, snapshot_uri, predicate_uri, invalid_datetime] 

255 dt_batch.extend([tuple(item) for item in record.get('dt', [])]) 

256 

257 # Missing Primary Source issues: each item is [snapshot_uri, generation_time] 

258 mps_batch.extend([tuple(item) for item in record.get('mps', [])]) 

259 

260 # Multiple Processing Agent issues: each item is [graph_uri, snapshot_uri] 

261 pa_batch.extend([tuple(item) for item in record.get('pa', [])]) 

262 

263 # Multiple Object issues: each item is [graph_uri, generation_time] 

264 mo_batch.extend([tuple(item) for item in record.get('mo', [])]) 

265 

266 # Flush batches when they reach chunk_size 

267 if (len(ff_batch) >= chunk_size or 

268 len(dt_batch) >= chunk_size or 

269 len(mps_batch) >= chunk_size or 

270 len(pa_batch) >= chunk_size or 

271 len(mo_batch) >= chunk_size): 

272 

273 flush_batches() 

274 

275 checkpoint.update_state( 

276 jsonl_file.name, 

277 line_num, 

278 total_ff_issues, 

279 total_dt_issues, 

280 total_mps_issues, 

281 total_pa_issues, 

282 total_mo_issues 

283 ) 

284 

285 except KeyboardInterrupt: 

286 checkpoint.flush() 

287 logging.error("KeyboardInterrupt") 

288 client.close() 

289 raise 

290 

291 except Exception as e: 

292 checkpoint.flush() 

293 logging.error(e) 

294 print(traceback.print_exc()) 

295 

296 # Flush any remaining batches after each file 

297 flush_batches() 

298 

299 elapsed_file = time.time() - start_time 

300 

301 times_per_file.append(elapsed_file) 

302 

303 logging.info(f"Finished processing: {jsonl_file.name} in {elapsed_file:.2f} seconds.") 

304 if file_idx % 5 == 0: 

305 avg_time = sum(times_per_file) / len(times_per_file) 

306 est_remaining = avg_time * (len(jsonl_files) - file_idx - 1) 

307 logging.info(f"Average time per file with last {len(times_per_file)} files: {avg_time:.2f} seconds. Estimated time remaining: {est_remaining/3600:.2f} hours") 

308 times_per_file.clear() 

309 

310 # Recreate SPARQL client after every file to avoid pycurl memory issues 

311 # logging.debug(f"Recreating SPARQLClient files to clear accumulated pycurl state.") 

312 client.close() 

313 client = SPARQLClient(endpoint) 

314 

315 

316 # Mark this file as completed in checkpoint 

317 checkpoint.update_state( 

318 jsonl_file.name, 

319 line_num, # Last line number 

320 total_ff_issues, 

321 total_dt_issues, 

322 total_mps_issues, 

323 total_pa_issues, 

324 total_mo_issues 

325 ) 

326 checkpoint.flush() 

327 

328 # Final flush to ensure all batches are applied 

329 flush_batches() 

330 

331 # Final checkpoint update 

332 checkpoint.update_state( 

333 "completed", 

334 0, 

335 total_ff_issues, 

336 total_dt_issues, 

337 total_mps_issues, 

338 total_pa_issues, 

339 total_mo_issues 

340 ) 

341 checkpoint.flush() 

342 client.close() 

343 

344 # Log statistics 

345 logging.info("=" * 80) 

346 logging.info("Summary of issues applied:") 

347 logging.info(f" Total files processed: {total_files}") 

348 logging.info(f" Filler issues: {total_ff_issues}") 

349 logging.info(f" DateTime issues: {total_dt_issues}") 

350 logging.info(f" Missing Primary Source issues: {total_mps_issues}") 

351 logging.info(f" Multiple Processing Agent issues: {total_pa_issues}") 

352 logging.info(f" Multiple Object issues: {total_mo_issues}") 

353 logging.info(f" Total: {total_ff_issues + total_dt_issues + total_mps_issues + total_pa_issues + total_mo_issues}") 

354 logging.info("=" * 80) 

355 

356 

357def apply_filler_issues( 

358 client: SPARQLClient, 

359 ff_issues: List[Tuple[str, dict]], 

360 failed_log_fp: str, 

361 chunk_size: int = 100 

362): 

363 """Apply filler fixer issues to the database.""" 

364 if not ff_issues: 

365 return 

366 

367 logging.debug(f"Applying {len(ff_issues)} filler issue(s)") 

368 

369 for chunk in batched(ff_issues, chunk_size): 

370 for graph_uri, values in chunk: 

371 to_delete = [URIRef(u) for u in values['to_delete']] 

372 remaining = [URIRef(u) for u in values['remaining_snapshots']] 

373 

374 rename_mapping = FillerFixerFile.map_se_names(set(to_delete), set(remaining)) 

375 

376 if to_delete: 

377 delete_query = FillerFixerFile.build_delete_sparql_query((URIRef(graph_uri), values)) 

378 sparql_update(client, delete_query, failed_log_fp) 

379 

380 if any(old != new for old, new in rename_mapping.items()): 

381 rename_query = FillerFixerFile.build_rename_sparql_query(rename_mapping) 

382 sparql_update(client, rename_query, failed_log_fp) 

383 

384 if remaining: 

385 newest_names = list(set(rename_mapping.values())) 

386 adaptime_query = FillerFixerFile.build_adapt_invaltime_sparql_query(graph_uri, [str(n) for n in newest_names]) 

387 sparql_update(client, adaptime_query, failed_log_fp) 

388 

389 

390def apply_datetime_issues( 

391 client: SPARQLClient, 

392 dt_issues: List[Tuple[str, str, str, str]], 

393 failed_log_fp: str, 

394 chunk_size: int = 100 

395): 

396 """Apply datetime fixer issues to the database.""" 

397 if not dt_issues: 

398 return 

399 

400 logging.debug(f"Applying {len(dt_issues)} datetime issue(s)") 

401 

402 issues_tuples = [(URIRef(g), URIRef(s), URIRef(p), dt) for g, s, p, dt in dt_issues] 

403 

404 for chunk in batched(issues_tuples, chunk_size): 

405 query = DateTimeFixerFile.build_update_query(chunk) 

406 sparql_update(client, query, failed_log_fp) 

407 

408 

409def apply_missing_ps_issues( 

410 client: SPARQLClient, 

411 mps_issues: List[Tuple[str, str]], 

412 meta_dumps: List[Tuple[str, str]], 

413 failed_log_fp: str, 

414 chunk_size: int = 100 

415): 

416 """Apply missing primary source issues to the database.""" 

417 if not mps_issues: 

418 return 

419 

420 logging.debug(f"Applying {len(mps_issues)} missing primary source issue(s)") 

421 

422 issues_tuples = [(URIRef(s), Literal(t, datatype=XSD.dateTime)) for s, t in mps_issues] 

423 

424 for chunk in batched(issues_tuples, chunk_size): 

425 query = MissingPrimSourceFixerFile.build_update_query(chunk, meta_dumps) 

426 sparql_update(client, query, failed_log_fp) 

427 

428 

429def apply_multi_pa_issues( 

430 client: SPARQLClient, 

431 pa_issues: List[Tuple[str, str]], 

432 failed_log_fp: str, 

433 chunk_size: int = 100 

434): 

435 """Apply multiple processing agent issues to the database.""" 

436 if not pa_issues: 

437 return 

438 

439 logging.debug(f"Applying {len(pa_issues)} multiple processing agent issue(s)") 

440 

441 issues_tuples = [(URIRef(g), URIRef(s)) for g, s in pa_issues] 

442 

443 for chunk in batched(issues_tuples, chunk_size): 

444 query = MultiPAFixerFile.build_update_query(chunk) 

445 sparql_update(client, query, failed_log_fp) 

446 

447 

448def apply_multi_object_issues( 

449 client: SPARQLClient, 

450 mo_issues: List[Tuple[str, str]], 

451 meta_dumps: List[Tuple[str, str]], 

452 failed_log_fp: str, 

453 chunk_size: int = 100 

454): 

455 """Apply multiple object issues to the database.""" 

456 if not mo_issues: 

457 return 

458 

459 logging.debug(f"Applying {len(mo_issues)} multiple object issue(s)") 

460 

461 issues_tuples = [(URIRef(g), Literal(t, datatype=XSD.dateTime)) for g, t in mo_issues] 

462 

463 for chunk in batched(issues_tuples, chunk_size): 

464 query = MultiObjectFixerFile.build_update_query(chunk, meta_dumps) 

465 sparql_update(client, query, failed_log_fp) 

466 

467 

468def main(): 

469 parser = argparse.ArgumentParser( 

470 description="Apply dry-run issues to database from JSON-Lines files" 

471 ) 

472 parser.add_argument("-e", "--endpoint", type=str, required=True, help="SPARQL endpoint URL") 

473 parser.add_argument("-d", "--issues-dir", type=str, required=True, help="Directory containing JSON-Lines files with issues") 

474 parser.add_argument("-m", "--meta-dumps", type=str, required=True, help="Path to meta dumps register JSON file") 

475 parser.add_argument("--chunk-size", type=int, default=100, help="Number of issues per SPARQL update batch (default: 100)") 

476 parser.add_argument("--failed-queries-fp", type=str, default=f"sparql_update_db_failed_queries_{datetime.today().strftime('%Y-%m-%d')}.txt", help="Path to log failed SPARQL update queries") 

477 parser.add_argument("-l", "--log-fp", type=str, default=f"sparql_update_db_{datetime.today().strftime('%Y-%m-%d')}.log", help="File path for the run log") 

478 parser.add_argument("-c", "--checkpoint-fp", type=str, default="sparql_update_db.checkpoint.json", help="Path to checkpoint file for resuming (default: sparql_update_db.checkpoint.json)") 

479 parser.add_argument("--no-resume", action="store_true", help="Do not use checkpoint to resume (start from beginning)") 

480 parser.add_argument("-r", "--auto-restart-container", action="store_true", help="Enable memory watchdog to auto-restart the Virtuoso Docker container when memory usage is too high.") 

481 parser.add_argument("-v", "--virtuoso-container", type=str, default=None, help="Name of the Virtuoso Docker container (required when --auto-restart-container is used).") 

482 

483 args = parser.parse_args() 

484 

485 if args.auto_restart_container: 

486 if not args.virtuoso_container: 

487 parser.error( 

488 "--virtuoso-container is required when using --auto-restart-container" 

489 ) 

490 

491 if args.auto_restart_container: 

492 print("Starting single Virtuoso watchdog (launcher-controlled)") 

493 start_watchdog_thread(args.virtuoso_container, args.endpoint) 

494 else: 

495 print("Watchdog disabled. Processes will not be auto-restarted.") 

496 

497 # Setup logging 

498 logging.basicConfig( 

499 level=logging.INFO, 

500 format="%(asctime)s - %(levelname)s - [%(funcName)s, %(filename)s:%(lineno)d] - %(message)s", 

501 filename=args.log_fp 

502 ) 

503 

504 logging.info("=" * 80) 

505 logging.info("Starting dry-run issues application process") 

506 logging.info(f"Endpoint: {args.endpoint}") 

507 logging.info(f"Issues directory: {args.issues_dir}") 

508 logging.info(f"Meta dumps: {args.meta_dumps}") 

509 logging.info(f"Chunk size: {args.chunk_size}") 

510 logging.info(f"Failed queries log: {args.failed_queries_fp}") 

511 logging.info(f"Checkpoint file: {args.checkpoint_fp}") 

512 logging.info(f"Resume: {not args.no_resume}") 

513 logging.info("=" * 80) 

514 

515 # Load meta dumps register 

516 logging.info(f"Loading meta dumps register from: {args.meta_dumps}") 

517 with open(args.meta_dumps, 'r', encoding='utf-8') as f: 

518 meta_dumps_raw = json.load(f) 

519 

520 meta_dumps = sorted( 

521 [(date.fromisoformat(d), url) for d, url in meta_dumps_raw], 

522 key=lambda x: x[0] 

523 ) 

524 

525 checkpoint = SparqlUpdatesCheckpoint(args.checkpoint_fp) 

526 

527 try: 

528 # Stream and apply issues from JSON-Lines files 

529 stream_and_fix_on_db( 

530 args.issues_dir, 

531 args.endpoint, 

532 meta_dumps, 

533 args.failed_queries_fp, 

534 args.chunk_size, 

535 checkpoint, 

536 not args.no_resume 

537 ) 

538 

539 logging.info("=" * 80) 

540 logging.info("Successfully applied all SPARQL updates to database!") 

541 logging.info("=" * 80) 

542 

543 except Exception as e: 

544 logging.error(f"Error applying issues: {e}") 

545 logging.error(f"Checkpoint state at error: {checkpoint.state if checkpoint else 'N/A'}") 

546 raise 

547 

548 finally: 

549 checkpoint.flush() 

550 

551 # Clean up checkpoint file on successful completion 

552 if os.path.exists(checkpoint.path): 

553 os.remove(checkpoint.path) 

554 logging.info("Checkpoint file cleaned up after successful completion") 

555 

556 

557if __name__ == '__main__': 

558 main()