Coverage for meta_prov_fixer / run_parallel_fix.py: 0%
70 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
1import subprocess
2import sys
3import signal
4from pathlib import Path
5import argparse
6import os
7from typing import Optional
9from meta_prov_fixer.virtuoso_watchdog import start_watchdog_thread
11# ================== CONFIG DEFAULT ==================
13META_DUMPS_DEFAULT = "meta_dumps.json"
15ENDPOINT_DEFAULT = "http://localhost:8890/sparql/"
16# CONTAINER_DEFAULT = "oc-meta-prov"
18DIRS = ["br", "ar", "re", "ra", "id"]
20LOGS_DIR = Path("logs")
21CHECKPOINTS_DIR = Path("checkpoints")
22CACHES_DIR = Path("caches")
23FAILED_DIR = Path("failed_queries")
25for d in (LOGS_DIR, CHECKPOINTS_DIR, CACHES_DIR, FAILED_DIR):
26 d.mkdir(exist_ok=True)
29env = os.environ.copy() # env for subprocesses
30env["TQDM_DISABLE"] = "1" # disable tqdm in subprocesses
32# ================== FIXER LAUNCH ==================
34def launch_fixer(
35 endpoint: str,
36 base_in: str,
37 base_out: str,
38 meta_dumps_fp: str,
39 dir_name: str,
40 dry_run_db: bool = False,
41 dry_run_files: bool = False,
42 dry_run_issues_dir: Optional[str] = None
43) -> subprocess.Popen:
44 print(f">>> Launching fixer for '{dir_name}'")
46 cmd = [
47 "poetry", "run", "python", "meta_prov_fixer/main.py",
48 "-e", endpoint,
49 "-i", f"{base_in}/{dir_name}",
50 "-o", f"{base_out}/{dir_name}",
51 "-m", meta_dumps_fp,
52 # file isolati
53 "--checkpoint-fp", str(CHECKPOINTS_DIR / f"fix_prov_{dir_name}.checkpoint.json"),
54 "--cache-fp", str(CACHES_DIR / f"filler_issues_{dir_name}.cache.json"),
55 "--failed-queries-fp", str(FAILED_DIR / f"failed_{dir_name}.txt"),
56 "-l", str(LOGS_DIR / f"provenance_fix_{dir_name}.log"),
57 ]
59 # Add dry-run flags if specified
60 if dry_run_db:
61 cmd.append("--dry-run-db")
62 if dry_run_files:
63 cmd.append("--dry-run-files")
64 if dry_run_issues_dir:
65 cmd.extend(["--dry-run-issues-dir", dry_run_issues_dir])
66 cmd.extend(["--dry-run-process-id", dir_name]) # Use dir_name as process_id for uniqueness
68 return subprocess.Popen(cmd, env=env)
70# ================== MAIN ==================
72def main():
73 parser = argparse.ArgumentParser(description="Run multiple fixer processes in parallel.")
74 parser.add_argument("--endpoint", type=str, default=ENDPOINT_DEFAULT,
75 help="SPARQL endpoint URL")
76 parser.add_argument("--base-dir", type=str, required=True,
77 help="Directory containing input subfolders")
78 parser.add_argument("--out-dir", type=str, required=True,
79 help="Directory where fixed outputs will be saved (in subfolders)")
80 parser.add_argument("--meta-dumps", type=str, default=META_DUMPS_DEFAULT,
81 help="Path to meta dumps register JSON file")
83 parser.add_argument(
84 "-r", "--auto-restart-container", action="store_true",
85 help="Enable memory watchdog to auto-restart the Virtuoso Docker container when memory usage is too high."
86 )
88 parser.add_argument(
89 "-v", "--virtuoso-container", type=str, default=None,
90 help="Name of the Virtuoso Docker container (required when --auto-restart-container is used)."
91 )
93 parser.add_argument(
94 "--dry-run-db", action="store_true",
95 help="Skip SPARQL updates to endpoint (write only fixed files)."
96 )
98 parser.add_argument(
99 "--dry-run-files", action="store_true",
100 help="Skip writing fixed files to out-dir (update only database)."
101 )
103 parser.add_argument(
104 "--dry-run-issues-dir", type=str, default=None,
105 help="Directory where to write issues found during dry-run as JSON-Lines files."
106 )
108 args = parser.parse_args()
110 if args.auto_restart_container:
111 if not args.virtuoso_container:
112 parser.error(
113 "--virtuoso-container is required when using --auto-restart-container"
114 )
116 if args.auto_restart_container:
117 print("Starting single Virtuoso watchdog (launcher-controlled)")
118 start_watchdog_thread(args.virtuoso_container, args.endpoint)
119 else:
120 print("Watchdog disabled. Processes will not be auto-restarted.")
122 # Launch fixers
123 processes = [
124 (d, launch_fixer(
125 args.endpoint,
126 args.base_dir,
127 args.out_dir,
128 args.meta_dumps,
129 d,
130 dry_run_db=args.dry_run_db,
131 dry_run_files=args.dry_run_files,
132 dry_run_issues_dir=args.dry_run_issues_dir
133 ))
134 for d in DIRS
135 ]
136 exit_code = 0
138 try:
139 for d, proc in processes:
140 ret = proc.wait()
141 if ret != 0:
142 print(f"Fixer '{d}' exited with code {ret}")
143 exit_code = ret
144 else:
145 print(f"Fixer '{d}' completed successfully")
147 except KeyboardInterrupt:
148 print("\nKeyboardInterrupt received: terminating all fixer processes")
149 for _, p in processes:
150 # Send SIGINT to each process so they can clean up gracefully
151 try:
152 p.send_signal(signal.SIGINT)
153 except ProcessLookupError:
154 # Process already terminated
155 pass
156 sys.exit(1)
158 print("All fixer processes completed")
159 sys.exit(exit_code)
162if __name__ == "__main__":
163 main()