Coverage for meta_prov_fixer / update_db_from_issues.py: 0%
273 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
1#!/usr/bin/env python3
2"""
3Script to apply SPARQL updates to database for already detected issues.
5This script reads issues from JSON-Lines files generated by dry-run mode
6and executes corresponding SPARQL UPDATE queries to database.
7"""
9import os
10import json
11import time
12import argparse
13import logging
14from pathlib import Path
15from datetime import datetime, date
16from typing import List, Tuple
17from rdflib import URIRef, Literal
18from rdflib.namespace import XSD
19import traceback
21from sparqlite import SPARQLClient, QueryError, EndpointError
22from meta_prov_fixer.src import (
23 FillerFixerFile,
24 DateTimeFixerFile,
25 MissingPrimSourceFixerFile,
26 MultiPAFixerFile,
27 MultiObjectFixerFile,
28 sparql_update
29)
30from meta_prov_fixer.utils import batched, normalise_datetime, get_previous_meta_dump_uri
31from meta_prov_fixer.virtuoso_watchdog import start_watchdog_thread
34class SparqlUpdatesCheckpoint:
35 """
36 Checkpoint class for tracking progress in update_db_from_issues.py.
38 Tracks which JSON-Lines files have been processed and how many issues
39 of each type have been applied to database.
40 """
42 def __init__(self, path: str):
43 self.path = path
44 self.state = None
45 self.dirty = False
46 self.load()
48 def load(self):
49 """Load checkpoint state from file if it exists."""
50 if os.path.exists(self.path):
51 with open(self.path, "r", encoding="utf-8") as f:
52 self.state = json.load(f)
53 else:
54 self.state = None
56 def _atomic_write(self, data: dict, retries: int = 5, delay: float = 0.05):
57 """Atomically write data to checkpoint file."""
58 tmp_path = self.path + ".tmp"
59 with open(tmp_path, "w", encoding="utf-8") as f:
60 json.dump(data, f, indent=2)
61 for i in range(retries):
62 try:
63 os.replace(tmp_path, self.path)
64 return
65 except PermissionError:
66 time.sleep(delay)
67 raise PermissionError(f"Failed to replace checkpoint file after {retries} retries")
69 def update_state(
70 self,
71 current_file: str,
72 line_number: int,
73 ff_applied: int,
74 dt_applied: int,
75 mps_applied: int,
76 pa_applied: int,
77 mo_applied: int
78 ):
79 """Update checkpoint state with current progress."""
80 self.state = {
81 "current_file": current_file,
82 "line_number": line_number,
83 "ff_applied": ff_applied,
84 "dt_applied": dt_applied,
85 "mps_applied": mps_applied,
86 "pa_applied": pa_applied,
87 "mo_applied": mo_applied,
88 "timestamp": datetime.now().isoformat()
89 }
90 self.dirty = True
92 def flush(self):
93 """Flush checkpoint state to disk."""
94 if self.dirty and self.state:
95 self._atomic_write(self.state)
96 self.dirty = False
98 def get_resume_line(self, filename: str) -> int:
99 """
100 Get line number to resume from for a given file.
102 Returns 0 if no checkpoint or different file.
103 Returns checkpointed line number if resuming same file.
104 """
105 if not self.state or self.state["current_file"] != filename:
106 return 0
107 return self.state["line_number"]
109 def get_applied_counts(self) -> Tuple[int, int, int, int, int]:
110 """Get counts of applied issues from checkpoint."""
111 if not self.state:
112 return (0, 0, 0, 0, 0)
113 return (
114 self.state["ff_applied"],
115 self.state["dt_applied"],
116 self.state["mps_applied"],
117 self.state["pa_applied"],
118 self.state["mo_applied"]
119 )
122def stream_and_fix_on_db(
123 issues_dir: str,
124 endpoint: str,
125 meta_dumps: List[Tuple[str, str]],
126 failed_log_fp: str,
127 chunk_size: int = 100,
128 checkpoint: SparqlUpdatesCheckpoint = SparqlUpdatesCheckpoint("sparql_update_db.checkpoint.json"),
129 resume: bool = True
130):
131 """
132 Stream JSON-Lines files and apply SPARQL updates in batches to the database.
134 Args:
135 issues_dir: Directory containing JSON-Lines files storing detected issues
136 endpoint: SPARQL endpoint URL for executing queries
137 meta_dumps: Meta dumps register
138 failed_log_fp: Path to failed queries log
139 chunk_size: Number of issues per SPARQL update batch
140 checkpoint: Checkpoint object for resuming
141 resume: If True, use checkpoint to skip already-processed data
142 """
143 client = SPARQLClient(endpoint)
144 issues_dir:Path = Path(issues_dir)
145 jsonl_files = sorted(issues_dir.glob('*.jsonl'))
146 logging.info(f"Found {len(jsonl_files)} JSON-Lines file(s)")
147 logging.info(f"Checkpoint state at start: {checkpoint.state}")
149 # Counters for statistics
150 total_files = 0
151 total_ff_issues = 0
152 total_dt_issues = 0
153 total_mps_issues = 0
154 total_pa_issues = 0
155 total_mo_issues = 0
157 # Load applied counts from checkpoint if resuming
158 if resume and checkpoint.state:
159 total_ff_issues, total_dt_issues, total_mps_issues, total_pa_issues, total_mo_issues = checkpoint.get_applied_counts()
160 logging.info("Resuming from checkpoint:")
161 logging.info(f" Filler issues already applied: {total_ff_issues}")
162 logging.info(f" DateTime issues already applied: {total_dt_issues}")
163 logging.info(f" Missing PS issues already applied: {total_mps_issues}")
164 logging.info(f" Multi PA issues already applied: {total_pa_issues}")
165 logging.info(f" Multi Object issues already applied: {total_mo_issues}")
167 # Batch accumulators for each issue type
168 ff_batch = []
169 dt_batch = []
170 mps_batch = []
171 pa_batch = []
172 mo_batch = []
174 def flush_batches():
175 """Flush all issue batches by applying the approriate fixes to the database."""
176 nonlocal total_ff_issues, total_dt_issues, total_mps_issues, total_pa_issues, total_mo_issues
178 if ff_batch:
179 apply_filler_issues(client, ff_batch, failed_log_fp, chunk_size)
180 total_ff_issues += len(ff_batch)
181 ff_batch.clear()
183 if dt_batch:
184 apply_datetime_issues(client, dt_batch, failed_log_fp, chunk_size)
185 total_dt_issues += len(dt_batch)
186 dt_batch.clear()
188 if mps_batch:
189 apply_missing_ps_issues(client, mps_batch, meta_dumps, failed_log_fp, chunk_size)
190 total_mps_issues += len(mps_batch)
191 mps_batch.clear()
193 if pa_batch:
194 apply_multi_pa_issues(client, pa_batch, failed_log_fp, chunk_size)
195 total_pa_issues += len(pa_batch)
196 pa_batch.clear()
198 if mo_batch:
199 apply_multi_object_issues(client, mo_batch, meta_dumps, failed_log_fp, chunk_size)
200 total_mo_issues += len(mo_batch)
201 mo_batch.clear()
203 times_per_file = []
205 logging.info(f"Processing JSON-Lines files in {issues_dir}...")
207 # Process each file line by line
208 for file_idx, jsonl_file in enumerate(jsonl_files):
209 # Checkpoint-based file skipping
210 resume_line = 0
211 if resume and checkpoint.state:
212 checkpointed_file = checkpoint.state["current_file"]
214 # Skip files that come before checkpointed file in sorted order
215 if jsonl_file.name < checkpointed_file:
216 # logging.debug(f"Skipping already completed: {jsonl_file.name}")
217 continue
218 # Resume from specific line in checkpointed file
219 elif jsonl_file.name == checkpointed_file:
220 resume_line = checkpoint.state["line_number"]
221 logging.info(f"Resuming from file '{jsonl_file.name}', line {resume_line}")
222 # Process all files after checkpointed file normally
223 else:
224 logging.debug(f"Processing: {jsonl_file.name}")
225 else:
226 logging.debug(f"Processing: {jsonl_file.name}")
228 start_time = time.time()
230 with open(jsonl_file, 'r', encoding='utf-8') as f:
231 for line_num, line in enumerate(f, 1):
232 # Skip lines we've already processed
233 if resume and line_num <= resume_line:
234 continue
236 line = line.strip()
237 if not line:
238 continue
240 try:
241 record = json.loads(line)
242 # Count files only when actually processing (not when skipping lines)
243 if line_num == 1 and resume_line == 0:
244 total_files += 1
246 # Filler issues: each item is [graph_uri, {'to_delete': [...], 'remaining_snapshots': [...]}]
247 ff_items = record.get('ff', [])
248 for item in ff_items:
249 if isinstance(item, list) and len(item) == 2:
250 ff_batch.append((item[0], {'to_delete': item[1]['to_delete'], 'remaining_snapshots': item[1]['remaining_snapshots']}))
251 else:
252 ff_batch.append(item)
254 # DateTime issues: each item is [graph_uri, snapshot_uri, predicate_uri, invalid_datetime]
255 dt_batch.extend([tuple(item) for item in record.get('dt', [])])
257 # Missing Primary Source issues: each item is [snapshot_uri, generation_time]
258 mps_batch.extend([tuple(item) for item in record.get('mps', [])])
260 # Multiple Processing Agent issues: each item is [graph_uri, snapshot_uri]
261 pa_batch.extend([tuple(item) for item in record.get('pa', [])])
263 # Multiple Object issues: each item is [graph_uri, generation_time]
264 mo_batch.extend([tuple(item) for item in record.get('mo', [])])
266 # Flush batches when they reach chunk_size
267 if (len(ff_batch) >= chunk_size or
268 len(dt_batch) >= chunk_size or
269 len(mps_batch) >= chunk_size or
270 len(pa_batch) >= chunk_size or
271 len(mo_batch) >= chunk_size):
273 flush_batches()
275 checkpoint.update_state(
276 jsonl_file.name,
277 line_num,
278 total_ff_issues,
279 total_dt_issues,
280 total_mps_issues,
281 total_pa_issues,
282 total_mo_issues
283 )
285 except KeyboardInterrupt:
286 checkpoint.flush()
287 logging.error("KeyboardInterrupt")
288 client.close()
289 raise
291 except Exception as e:
292 checkpoint.flush()
293 logging.error(e)
294 print(traceback.print_exc())
296 # Flush any remaining batches after each file
297 flush_batches()
299 elapsed_file = time.time() - start_time
301 times_per_file.append(elapsed_file)
303 logging.info(f"Finished processing: {jsonl_file.name} in {elapsed_file:.2f} seconds.")
304 if file_idx % 5 == 0:
305 avg_time = sum(times_per_file) / len(times_per_file)
306 est_remaining = avg_time * (len(jsonl_files) - file_idx - 1)
307 logging.info(f"Average time per file with last {len(times_per_file)} files: {avg_time:.2f} seconds. Estimated time remaining: {est_remaining/3600:.2f} hours")
308 times_per_file.clear()
310 # Recreate SPARQL client after every file to avoid pycurl memory issues
311 # logging.debug(f"Recreating SPARQLClient files to clear accumulated pycurl state.")
312 client.close()
313 client = SPARQLClient(endpoint)
316 # Mark this file as completed in checkpoint
317 checkpoint.update_state(
318 jsonl_file.name,
319 line_num, # Last line number
320 total_ff_issues,
321 total_dt_issues,
322 total_mps_issues,
323 total_pa_issues,
324 total_mo_issues
325 )
326 checkpoint.flush()
328 # Final flush to ensure all batches are applied
329 flush_batches()
331 # Final checkpoint update
332 checkpoint.update_state(
333 "completed",
334 0,
335 total_ff_issues,
336 total_dt_issues,
337 total_mps_issues,
338 total_pa_issues,
339 total_mo_issues
340 )
341 checkpoint.flush()
342 client.close()
344 # Log statistics
345 logging.info("=" * 80)
346 logging.info("Summary of issues applied:")
347 logging.info(f" Total files processed: {total_files}")
348 logging.info(f" Filler issues: {total_ff_issues}")
349 logging.info(f" DateTime issues: {total_dt_issues}")
350 logging.info(f" Missing Primary Source issues: {total_mps_issues}")
351 logging.info(f" Multiple Processing Agent issues: {total_pa_issues}")
352 logging.info(f" Multiple Object issues: {total_mo_issues}")
353 logging.info(f" Total: {total_ff_issues + total_dt_issues + total_mps_issues + total_pa_issues + total_mo_issues}")
354 logging.info("=" * 80)
357def apply_filler_issues(
358 client: SPARQLClient,
359 ff_issues: List[Tuple[str, dict]],
360 failed_log_fp: str,
361 chunk_size: int = 100
362):
363 """Apply filler fixer issues to the database."""
364 if not ff_issues:
365 return
367 logging.debug(f"Applying {len(ff_issues)} filler issue(s)")
369 for chunk in batched(ff_issues, chunk_size):
370 for graph_uri, values in chunk:
371 to_delete = [URIRef(u) for u in values['to_delete']]
372 remaining = [URIRef(u) for u in values['remaining_snapshots']]
374 rename_mapping = FillerFixerFile.map_se_names(set(to_delete), set(remaining))
376 if to_delete:
377 delete_query = FillerFixerFile.build_delete_sparql_query((URIRef(graph_uri), values))
378 sparql_update(client, delete_query, failed_log_fp)
380 if any(old != new for old, new in rename_mapping.items()):
381 rename_query = FillerFixerFile.build_rename_sparql_query(rename_mapping)
382 sparql_update(client, rename_query, failed_log_fp)
384 if remaining:
385 newest_names = list(set(rename_mapping.values()))
386 adaptime_query = FillerFixerFile.build_adapt_invaltime_sparql_query(graph_uri, [str(n) for n in newest_names])
387 sparql_update(client, adaptime_query, failed_log_fp)
390def apply_datetime_issues(
391 client: SPARQLClient,
392 dt_issues: List[Tuple[str, str, str, str]],
393 failed_log_fp: str,
394 chunk_size: int = 100
395):
396 """Apply datetime fixer issues to the database."""
397 if not dt_issues:
398 return
400 logging.debug(f"Applying {len(dt_issues)} datetime issue(s)")
402 issues_tuples = [(URIRef(g), URIRef(s), URIRef(p), dt) for g, s, p, dt in dt_issues]
404 for chunk in batched(issues_tuples, chunk_size):
405 query = DateTimeFixerFile.build_update_query(chunk)
406 sparql_update(client, query, failed_log_fp)
409def apply_missing_ps_issues(
410 client: SPARQLClient,
411 mps_issues: List[Tuple[str, str]],
412 meta_dumps: List[Tuple[str, str]],
413 failed_log_fp: str,
414 chunk_size: int = 100
415):
416 """Apply missing primary source issues to the database."""
417 if not mps_issues:
418 return
420 logging.debug(f"Applying {len(mps_issues)} missing primary source issue(s)")
422 issues_tuples = [(URIRef(s), Literal(t, datatype=XSD.dateTime)) for s, t in mps_issues]
424 for chunk in batched(issues_tuples, chunk_size):
425 query = MissingPrimSourceFixerFile.build_update_query(chunk, meta_dumps)
426 sparql_update(client, query, failed_log_fp)
429def apply_multi_pa_issues(
430 client: SPARQLClient,
431 pa_issues: List[Tuple[str, str]],
432 failed_log_fp: str,
433 chunk_size: int = 100
434):
435 """Apply multiple processing agent issues to the database."""
436 if not pa_issues:
437 return
439 logging.debug(f"Applying {len(pa_issues)} multiple processing agent issue(s)")
441 issues_tuples = [(URIRef(g), URIRef(s)) for g, s in pa_issues]
443 for chunk in batched(issues_tuples, chunk_size):
444 query = MultiPAFixerFile.build_update_query(chunk)
445 sparql_update(client, query, failed_log_fp)
448def apply_multi_object_issues(
449 client: SPARQLClient,
450 mo_issues: List[Tuple[str, str]],
451 meta_dumps: List[Tuple[str, str]],
452 failed_log_fp: str,
453 chunk_size: int = 100
454):
455 """Apply multiple object issues to the database."""
456 if not mo_issues:
457 return
459 logging.debug(f"Applying {len(mo_issues)} multiple object issue(s)")
461 issues_tuples = [(URIRef(g), Literal(t, datatype=XSD.dateTime)) for g, t in mo_issues]
463 for chunk in batched(issues_tuples, chunk_size):
464 query = MultiObjectFixerFile.build_update_query(chunk, meta_dumps)
465 sparql_update(client, query, failed_log_fp)
468def main():
469 parser = argparse.ArgumentParser(
470 description="Apply dry-run issues to database from JSON-Lines files"
471 )
472 parser.add_argument("-e", "--endpoint", type=str, required=True, help="SPARQL endpoint URL")
473 parser.add_argument("-d", "--issues-dir", type=str, required=True, help="Directory containing JSON-Lines files with issues")
474 parser.add_argument("-m", "--meta-dumps", type=str, required=True, help="Path to meta dumps register JSON file")
475 parser.add_argument("--chunk-size", type=int, default=100, help="Number of issues per SPARQL update batch (default: 100)")
476 parser.add_argument("--failed-queries-fp", type=str, default=f"sparql_update_db_failed_queries_{datetime.today().strftime('%Y-%m-%d')}.txt", help="Path to log failed SPARQL update queries")
477 parser.add_argument("-l", "--log-fp", type=str, default=f"sparql_update_db_{datetime.today().strftime('%Y-%m-%d')}.log", help="File path for the run log")
478 parser.add_argument("-c", "--checkpoint-fp", type=str, default="sparql_update_db.checkpoint.json", help="Path to checkpoint file for resuming (default: sparql_update_db.checkpoint.json)")
479 parser.add_argument("--no-resume", action="store_true", help="Do not use checkpoint to resume (start from beginning)")
480 parser.add_argument("-r", "--auto-restart-container", action="store_true", help="Enable memory watchdog to auto-restart the Virtuoso Docker container when memory usage is too high.")
481 parser.add_argument("-v", "--virtuoso-container", type=str, default=None, help="Name of the Virtuoso Docker container (required when --auto-restart-container is used).")
483 args = parser.parse_args()
485 if args.auto_restart_container:
486 if not args.virtuoso_container:
487 parser.error(
488 "--virtuoso-container is required when using --auto-restart-container"
489 )
491 if args.auto_restart_container:
492 print("Starting single Virtuoso watchdog (launcher-controlled)")
493 start_watchdog_thread(args.virtuoso_container, args.endpoint)
494 else:
495 print("Watchdog disabled. Processes will not be auto-restarted.")
497 # Setup logging
498 logging.basicConfig(
499 level=logging.INFO,
500 format="%(asctime)s - %(levelname)s - [%(funcName)s, %(filename)s:%(lineno)d] - %(message)s",
501 filename=args.log_fp
502 )
504 logging.info("=" * 80)
505 logging.info("Starting dry-run issues application process")
506 logging.info(f"Endpoint: {args.endpoint}")
507 logging.info(f"Issues directory: {args.issues_dir}")
508 logging.info(f"Meta dumps: {args.meta_dumps}")
509 logging.info(f"Chunk size: {args.chunk_size}")
510 logging.info(f"Failed queries log: {args.failed_queries_fp}")
511 logging.info(f"Checkpoint file: {args.checkpoint_fp}")
512 logging.info(f"Resume: {not args.no_resume}")
513 logging.info("=" * 80)
515 # Load meta dumps register
516 logging.info(f"Loading meta dumps register from: {args.meta_dumps}")
517 with open(args.meta_dumps, 'r', encoding='utf-8') as f:
518 meta_dumps_raw = json.load(f)
520 meta_dumps = sorted(
521 [(date.fromisoformat(d), url) for d, url in meta_dumps_raw],
522 key=lambda x: x[0]
523 )
525 checkpoint = SparqlUpdatesCheckpoint(args.checkpoint_fp)
527 try:
528 # Stream and apply issues from JSON-Lines files
529 stream_and_fix_on_db(
530 args.issues_dir,
531 args.endpoint,
532 meta_dumps,
533 args.failed_queries_fp,
534 args.chunk_size,
535 checkpoint,
536 not args.no_resume
537 )
539 logging.info("=" * 80)
540 logging.info("Successfully applied all SPARQL updates to database!")
541 logging.info("=" * 80)
543 except Exception as e:
544 logging.error(f"Error applying issues: {e}")
545 logging.error(f"Checkpoint state at error: {checkpoint.state if checkpoint else 'N/A'}")
546 raise
548 finally:
549 checkpoint.flush()
551 # Clean up checkpoint file on successful completion
552 if os.path.exists(checkpoint.path):
553 os.remove(checkpoint.path)
554 logging.info("Checkpoint file cleaned up after successful completion")
557if __name__ == '__main__':
558 main()