Coverage for meta_prov_fixer / legacy / legacy_main.py: 0%
53 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
1#!/usr/bin/env python3
2import argparse
3import json
4import logging
5import datetime
7from meta_prov_fixer.legacy.fix_via_sparql import fix_process, fix_process_reading_from_files
8from meta_prov_fixer.virtuoso_watchdog import start_watchdog_thread
10def load_meta_dumps(json_path: str):
11 """
12 Load meta_dumps_pub_dates from a JSON file.
13 The JSON file should contain a list of [date, url] pairs.
14 """
15 try:
16 with open(json_path, "r", encoding="utf-8") as f:
17 data = json.load(f)
18 if not all(isinstance(t, list) and len(t) == 2 for t in data):
19 raise ValueError
20 return [(str(d[0]), str(d[1])) for d in data]
21 except Exception as e:
22 raise argparse.ArgumentTypeError(
23 f"Failed to load meta_dumps_pub_dates from '{json_path}': {e}"
24 )
27def main():
28 parser = argparse.ArgumentParser(
29 description="Run the pipeline for fixing Meta provenance triplestore"
30 )
32 parser.add_argument(
33 "-e", "--endpoint", type=str, required=True,
34 help="SPARQL endpoint URL"
35 )
37 parser.add_argument(
38 "-m", "--meta-dumps", type=load_meta_dumps, required=True,
39 help="Path to JSON file with list of [date, URL] pairs"
40 )
42 parser.add_argument(
43 "-i", "--issues-log-dir", type=str, default=None,
44 help="Directory to save data to fix. Required if using --dump-dir."
45 )
47 parser.add_argument(
48 "-c", "--checkpoint", type=str, default="checkpoint.json",
49 help="Path to checkpoint file"
50 )
52 parser.add_argument(
53 "--dry-run", action="store_true",
54 help="Run in dry-run mode (no update queries made to the database)."
55 )
57 parser.add_argument(
58 "-d", "--dump-dir", type=str, default=None,
59 help="Path to directory containing RDF dumps. If provided, "
60 "the pipeline reads from files instead of querying the endpoint."
61 )
63 parser.add_argument(
64 "-l", "--log-fp", type=str,
65 default=f"provenance_fix_{datetime.date.today().strftime('%Y-%m-%d')}.log",
66 help="File path to log file."
67 )
69 parser.add_argument(
70 "-r", "--auto-restart-container", action="store_true",
71 help="Enable memory watchdog to auto-restart the Virtuoso Docker container when memory usage is too high."
72 )
74 parser.add_argument(
75 "-v", "--virtuoso-container", type=str, default=None,
76 help="Name of the Virtuoso Docker container (required when --auto-restart-container is used)."
77 )
79 args = parser.parse_args()
81 if args.auto_restart_container:
82 if not args.virtuoso_container:
83 parser.error(
84 "--virtuoso-container is required when using --auto-restart-container"
85 )
87 # --- Enforce issues_log_dir if dump_dir is used ---
88 if args.dump_dir and not args.issues_log_dir:
89 parser.error("--issues-log-dir (-i) is required when using --dump-dir")
91 # --- Logging setup ---
92 logging.basicConfig(
93 level=logging.INFO,
94 format="%(asctime)s - %(levelname)s - %(message)s",
95 filename=args.log_fp
96 )
98 logging.info("Starting provenance fixing pipeline…")
99 logging.info(f"Endpoint: {args.endpoint}")
100 logging.info(f"Dump dir: {args.dump_dir or 'None (SPARQL endpoint mode)'}")
101 logging.info(f"Issues log dir: {args.issues_log_dir}")
102 logging.info(f"Checkpoint: {args.checkpoint}")
103 logging.info(f"Dry run: {args.dry_run}")
104 logging.info(f"Auto-restart enabled: {args.auto_restart_container}")
105 logging.info(f"Virtuoso container: {args.virtuoso_container}")
107 # --- Start the Virtuoso memory watchdog thread if enabled ---
108 if args.auto_restart_container:
109 logging.info("Starting Virtuoso memory watchdog thread...")
110 start_watchdog_thread(
111 container_name=args.virtuoso_container,
112 endpoint=args.endpoint
113 )
114 else:
115 logging.info("Auto-restart watchdog disabled.")
117 # --- Choose processing mode ---
118 if args.dump_dir:
119 logging.info("Running pipeline in 'file-based' mode (reading from RDF dumps).")
120 fix_process_reading_from_files(
121 endpoint=args.endpoint,
122 dump_dir=args.dump_dir,
123 issues_log_dir=args.issues_log_dir,
124 meta_dumps_pub_dates=args.meta_dumps,
125 dry_run=args.dry_run,
126 checkpoint_fp=args.checkpoint
127 )
128 else:
129 logging.info("Running pipeline in 'SPARQL endpoint' mode.")
130 fix_process(
131 endpoint=args.endpoint,
132 meta_dumps_pub_dates=args.meta_dumps,
133 issues_log_dir=args.issues_log_dir or "data_to_fix",
134 dry_run=args.dry_run,
135 checkpoint_fp=args.checkpoint
136 )
138 logging.info("Provenance fixing pipeline completed successfully.")
141if __name__ == "__main__":
142 main()
145## Detect issues from DB and fix on DB (storing errors in memory only):
146## poetry run python meta_prov_fixer/main.py -e http://localhost:8890/sparql/ -m meta_dumps.json
149## Detect issues from RDF files and fix on DB:
150## poetry run python meta_prov_fixer/main.py -e http://localhost:8890/sparql/ -m meta_dumps.json -i "./data_to_fix" -d "/meta/dump/directory/"
153## Detect issues from RDF files and fix on DB, automatically restarting Virtuoso Docker container if memory usage exceeds 98%
154## poetry run python meta_prov_fixer/main.py -e http://localhost:8890/sparql/ -m meta_dumps.json -i ./data_to_fix -d "/meta/dump/directory/" --auto-restart-container --virtuoso-container <container_name>