Coverage for meta_prov_fixer / run_parallel_fix.py: 0%

70 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:12 +0000

1import subprocess 

2import sys 

3import signal 

4from pathlib import Path 

5import argparse 

6import os 

7from typing import Optional 

8 

9from meta_prov_fixer.virtuoso_watchdog import start_watchdog_thread 

10 

11# ================== CONFIG DEFAULT ================== 

12 

13META_DUMPS_DEFAULT = "meta_dumps.json" 

14 

15ENDPOINT_DEFAULT = "http://localhost:8890/sparql/" 

16# CONTAINER_DEFAULT = "oc-meta-prov" 

17 

18DIRS = ["br", "ar", "re", "ra", "id"] 

19 

20LOGS_DIR = Path("logs") 

21CHECKPOINTS_DIR = Path("checkpoints") 

22CACHES_DIR = Path("caches") 

23FAILED_DIR = Path("failed_queries") 

24 

25for d in (LOGS_DIR, CHECKPOINTS_DIR, CACHES_DIR, FAILED_DIR): 

26 d.mkdir(exist_ok=True) 

27 

28 

29env = os.environ.copy() # env for subprocesses 

30env["TQDM_DISABLE"] = "1" # disable tqdm in subprocesses 

31 

32# ================== FIXER LAUNCH ================== 

33 

34def launch_fixer( 

35 endpoint: str, 

36 base_in: str, 

37 base_out: str, 

38 meta_dumps_fp: str, 

39 dir_name: str, 

40 dry_run_db: bool = False, 

41 dry_run_files: bool = False, 

42 dry_run_issues_dir: Optional[str] = None 

43) -> subprocess.Popen: 

44 print(f">>> Launching fixer for '{dir_name}'") 

45 

46 cmd = [ 

47 "poetry", "run", "python", "meta_prov_fixer/main.py", 

48 "-e", endpoint, 

49 "-i", f"{base_in}/{dir_name}", 

50 "-o", f"{base_out}/{dir_name}", 

51 "-m", meta_dumps_fp, 

52 # file isolati 

53 "--checkpoint-fp", str(CHECKPOINTS_DIR / f"fix_prov_{dir_name}.checkpoint.json"), 

54 "--cache-fp", str(CACHES_DIR / f"filler_issues_{dir_name}.cache.json"), 

55 "--failed-queries-fp", str(FAILED_DIR / f"failed_{dir_name}.txt"), 

56 "-l", str(LOGS_DIR / f"provenance_fix_{dir_name}.log"), 

57 ] 

58 

59 # Add dry-run flags if specified 

60 if dry_run_db: 

61 cmd.append("--dry-run-db") 

62 if dry_run_files: 

63 cmd.append("--dry-run-files") 

64 if dry_run_issues_dir: 

65 cmd.extend(["--dry-run-issues-dir", dry_run_issues_dir]) 

66 cmd.extend(["--dry-run-process-id", dir_name]) # Use dir_name as process_id for uniqueness 

67 

68 return subprocess.Popen(cmd, env=env) 

69 

70# ================== MAIN ================== 

71 

72def main(): 

73 parser = argparse.ArgumentParser(description="Run multiple fixer processes in parallel.") 

74 parser.add_argument("--endpoint", type=str, default=ENDPOINT_DEFAULT, 

75 help="SPARQL endpoint URL") 

76 parser.add_argument("--base-dir", type=str, required=True, 

77 help="Directory containing input subfolders") 

78 parser.add_argument("--out-dir", type=str, required=True, 

79 help="Directory where fixed outputs will be saved (in subfolders)") 

80 parser.add_argument("--meta-dumps", type=str, default=META_DUMPS_DEFAULT, 

81 help="Path to meta dumps register JSON file") 

82 

83 parser.add_argument( 

84 "-r", "--auto-restart-container", action="store_true", 

85 help="Enable memory watchdog to auto-restart the Virtuoso Docker container when memory usage is too high." 

86 ) 

87 

88 parser.add_argument( 

89 "-v", "--virtuoso-container", type=str, default=None, 

90 help="Name of the Virtuoso Docker container (required when --auto-restart-container is used)." 

91 ) 

92 

93 parser.add_argument( 

94 "--dry-run-db", action="store_true", 

95 help="Skip SPARQL updates to endpoint (write only fixed files)." 

96 ) 

97 

98 parser.add_argument( 

99 "--dry-run-files", action="store_true", 

100 help="Skip writing fixed files to out-dir (update only database)." 

101 ) 

102 

103 parser.add_argument( 

104 "--dry-run-issues-dir", type=str, default=None, 

105 help="Directory where to write issues found during dry-run as JSON-Lines files." 

106 ) 

107 

108 args = parser.parse_args() 

109 

110 if args.auto_restart_container: 

111 if not args.virtuoso_container: 

112 parser.error( 

113 "--virtuoso-container is required when using --auto-restart-container" 

114 ) 

115 

116 if args.auto_restart_container: 

117 print("Starting single Virtuoso watchdog (launcher-controlled)") 

118 start_watchdog_thread(args.virtuoso_container, args.endpoint) 

119 else: 

120 print("Watchdog disabled. Processes will not be auto-restarted.") 

121 

122 # Launch fixers 

123 processes = [ 

124 (d, launch_fixer( 

125 args.endpoint, 

126 args.base_dir, 

127 args.out_dir, 

128 args.meta_dumps, 

129 d, 

130 dry_run_db=args.dry_run_db, 

131 dry_run_files=args.dry_run_files, 

132 dry_run_issues_dir=args.dry_run_issues_dir 

133 )) 

134 for d in DIRS 

135 ] 

136 exit_code = 0 

137 

138 try: 

139 for d, proc in processes: 

140 ret = proc.wait() 

141 if ret != 0: 

142 print(f"Fixer '{d}' exited with code {ret}") 

143 exit_code = ret 

144 else: 

145 print(f"Fixer '{d}' completed successfully") 

146 

147 except KeyboardInterrupt: 

148 print("\nKeyboardInterrupt received: terminating all fixer processes") 

149 for _, p in processes: 

150 # Send SIGINT to each process so they can clean up gracefully 

151 try: 

152 p.send_signal(signal.SIGINT) 

153 except ProcessLookupError: 

154 # Process already terminated 

155 pass 

156 sys.exit(1) 

157 

158 print("All fixer processes completed") 

159 sys.exit(exit_code) 

160 

161 

162if __name__ == "__main__": 

163 main()