Coverage for oc_meta / run / meta_process.py: 73%
299 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2019 Silvio Peroni <silvio.peroni@unibo.it>
2# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
3# SPDX-FileCopyrightText: 2021 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8from __future__ import annotations
10import bisect
11import csv
12import multiprocessing
13import os
14import sys
15import traceback
16from argparse import ArgumentParser
17from datetime import datetime
18from sys import executable, platform
19from typing import Any, Dict, List, Optional, Tuple
21import orjson
22import yaml
23from oc_ocdm import Storer
24from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
25from oc_ocdm.prov import ProvSet
26from oc_ocdm.support.reporter import Reporter
27from piccione.upload.on_triplestore import upload_sparql_updates
28from rich_argparse import RichHelpFormatter
29from time_agnostic_library.support import generate_config_file
31from oc_meta.core.creator import Creator
32from oc_meta.core.curator import Curator
33from oc_meta.lib.console import console, create_progress
34from oc_meta.lib.file_manager import (get_csv_data, init_cache, normalize_path,
35 pathoo, sort_files)
36from oc_meta.lib.timer import ProcessTimer
37from oc_meta.run.benchmark.plotting import plot_incremental_progress
40def _upload_to_triplestore(endpoint: str, folder: str, redis_host: str, redis_port: int, redis_db: int, failed_file: str, stop_file: str, description: str = "Processing files") -> None:
41 """Upload SPARQL queries from folder to triplestore endpoint."""
42 try:
43 upload_sparql_updates(
44 endpoint=endpoint,
45 folder=folder,
46 failed_file=failed_file,
47 stop_file=stop_file,
48 redis_host=redis_host,
49 redis_port=redis_port,
50 redis_db=redis_db,
51 description=description,
52 show_progress=False,
53 )
54 except Exception as e:
55 console.print(f"[red]Upload to {endpoint} failed: {e}[/red]")
56 sys.exit(1)
59def _generate_queries_worker(storer: Storer, triplestore_url: str, base_dir: str) -> None:
60 storer.upload_all(
61 triplestore_url=triplestore_url,
62 base_dir=base_dir,
63 batch_size=10,
64 )
67def _store_rdf_worker(storer: Storer, base_dir, base_iri):
68 storer.store_all(
69 base_dir=base_dir,
70 base_iri=base_iri,
71 )
74class MetaProcess:
75 def __init__(self, settings: dict, meta_config_path: str, timer: Optional[ProcessTimer] = None):
76 self.settings = settings
77 # Mandatory settings
78 self.triplestore_url = settings["triplestore_url"] # Main triplestore for data
79 self.provenance_triplestore_url = settings["provenance_triplestore_url"] # Separate triplestore for provenance
80 self.input_csv_dir = normalize_path(settings["input_csv_dir"])
81 self.base_output_dir = normalize_path(settings["base_output_dir"])
82 self.resp_agent = settings["resp_agent"]
83 self.output_csv_dir = os.path.join(self.base_output_dir, "csv")
84 self.output_rdf_dir = (
85 normalize_path(settings["output_rdf_dir"]) + os.sep + "rdf" + os.sep
86 )
87 self.cache_path = os.path.join(self.base_output_dir, "cache.txt")
88 self.errors_path = os.path.join(self.base_output_dir, "errors.txt")
89 self.timer = timer or ProcessTimer(enabled=False)
90 # Optional settings
91 self.base_iri = settings["base_iri"]
92 self.dir_split_number = settings["dir_split_number"]
93 self.items_per_file = settings["items_per_file"]
94 self.default_dir = settings["default_dir"]
95 self.zip_output_rdf = settings["zip_output_rdf"]
96 self.source = settings["source"]
97 supplier_prefix: str = settings["supplier_prefix"]
98 self.supplier_prefix = (
99 supplier_prefix if supplier_prefix.endswith("0") else f"{supplier_prefix}0"
100 )
101 self.silencer = settings["silencer"]
102 self.rdf_files_only = settings.get("rdf_files_only", False)
103 # Time-Agnostic_library integration
104 self.time_agnostic_library_config = os.path.join(
105 os.path.dirname(meta_config_path), "time_agnostic_library_config.json"
106 )
107 if not os.path.exists(self.time_agnostic_library_config):
108 generate_config_file(
109 config_path=self.time_agnostic_library_config,
110 dataset_urls=[self.triplestore_url],
111 dataset_dirs=list(),
112 provenance_urls=[self.provenance_triplestore_url] if self.provenance_triplestore_url not in settings["provenance_endpoints"] else settings["provenance_endpoints"],
113 provenance_dirs=list(),
114 blazegraph_full_text_search=settings["blazegraph_full_text_search"],
115 fuseki_full_text_search=settings["fuseki_full_text_search"],
116 virtuoso_full_text_search=settings["virtuoso_full_text_search"],
117 graphdb_connector_name=settings["graphdb_connector_name"],
118 )
120 # Redis settings
121 self.redis_host = settings.get("redis_host", "localhost")
122 self.redis_port = settings.get("redis_port", 6379)
123 self.redis_db = settings.get("redis_db", 5)
124 self.redis_cache_db = settings.get("redis_cache_db", 2)
126 self.counter_handler = RedisCounterHandler(
127 host=self.redis_host, port=self.redis_port, db=self.redis_db
128 )
130 # Triplestore upload settings
131 self.ts_failed_queries = settings.get("ts_failed_queries", "failed_queries.txt")
132 self.ts_stop_file = settings.get("ts_stop_file", ".stop_upload")
134 self.data_update_dir = os.path.join(self.base_output_dir, "to_be_uploaded_data")
135 self.prov_update_dir = os.path.join(self.base_output_dir, "to_be_uploaded_prov")
137 def prepare_folders(self) -> List[str]:
138 completed = init_cache(self.cache_path)
139 files_in_input_csv_dir = {
140 filename
141 for filename in os.listdir(self.input_csv_dir)
142 if filename.endswith(".csv")
143 }
144 files_to_be_processed = sort_files(
145 list(files_in_input_csv_dir.difference(completed))
146 )
147 pathoo(self.output_csv_dir)
148 csv.field_size_limit(128)
149 return files_to_be_processed
151 def curate_and_create(
152 self,
153 filename: str,
154 cache_path: str,
155 errors_path: str,
156 settings: dict | None = None,
157 meta_config_path: str | None = None,
158 progress=None,
159 ) -> Tuple[dict, str, str, str]:
160 try:
161 with self.timer.timer("total_processing"):
162 filepath = os.path.join(self.input_csv_dir, filename)
163 console.print(filepath)
164 data = get_csv_data(filepath)
165 self.timer.record_metric("input_records", len(data))
167 min_rows_parallel = settings.get("min_rows_parallel", 1000) if settings else 1000
168 curator_obj = Curator(
169 data=data,
170 ts=self.triplestore_url,
171 prov_config=self.time_agnostic_library_config,
172 counter_handler=self.counter_handler,
173 base_iri=self.base_iri,
174 prefix=self.supplier_prefix,
175 settings=settings,
176 silencer=self.silencer,
177 meta_config_path=meta_config_path,
178 timer=self.timer,
179 progress=progress,
180 min_rows_parallel=min_rows_parallel,
181 )
182 name = f"{filename.replace('.csv', '')}_{datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}"
183 curator_obj.curator(
184 filename=name, path_csv=self.output_csv_dir
185 )
186 self.timer.record_metric("curated_records", len(curator_obj.data))
188 local_g_size = len(curator_obj.finder.graph)
189 self.timer.record_metric("local_g_triples", local_g_size)
190 preexisting_count = len(curator_obj.preexisting_entities)
191 self.timer.record_metric("preexisting_entities_count", preexisting_count)
193 RDF_BATCH_SIZE = 100_000
194 data = curator_obj.data
195 n_batches = (len(data) + RDF_BATCH_SIZE - 1) // RDF_BATCH_SIZE
196 total_entities = 0
197 total_modified = 0
199 batch_task_id = None
200 if progress is not None and n_batches > 1:
201 batch_task_id = progress.add_task(
202 f" [cyan]RDF batches[/cyan] ({filename})",
203 total=n_batches,
204 )
206 for batch_idx in range(n_batches):
207 batch_start = batch_idx * RDF_BATCH_SIZE
208 batch_end = min(batch_start + RDF_BATCH_SIZE, len(data))
209 batch_data = data[batch_start:batch_end]
211 with self.timer.timer("rdf_creation"):
212 creator_obj = Creator(
213 data=batch_data,
214 finder=curator_obj.finder,
215 base_iri=self.base_iri,
216 counter_handler=self.counter_handler,
217 supplier_prefix=self.supplier_prefix,
218 resp_agent=self.resp_agent,
219 ra_index=curator_obj.index_id_ra,
220 br_index=curator_obj.index_id_br,
221 re_index_csv=curator_obj.re_index,
222 ar_index_csv=curator_obj.ar_index,
223 vi_index=curator_obj.VolIss,
224 silencer=self.silencer,
225 progress=progress,
226 )
227 creator = creator_obj.creator(source=self.source)
228 total_entities += len(creator.res_to_entity)
230 prov = ProvSet(
231 creator,
232 self.base_iri,
233 wanted_label=False,
234 supplier_prefix=self.supplier_prefix,
235 custom_counter_handler=self.counter_handler,
236 )
237 modified_entities = prov.generate_provenance()
238 total_modified += len(modified_entities)
240 repok = Reporter(print_sentences=False)
241 reperr = Reporter(print_sentences=True, prefix="[Storer: ERROR] ")
242 res_storer = Storer(
243 abstract_set=creator,
244 repok=repok,
245 reperr=reperr,
246 dir_split=self.dir_split_number,
247 n_file_item=self.items_per_file,
248 default_dir=self.default_dir,
249 output_format="json-ld",
250 zip_output=self.zip_output_rdf,
251 modified_entities=modified_entities,
252 )
253 prov_storer = Storer(
254 abstract_set=prov,
255 repok=repok,
256 reperr=reperr,
257 dir_split=self.dir_split_number,
258 n_file_item=self.items_per_file,
259 output_format="json-ld",
260 zip_output=self.zip_output_rdf,
261 modified_entities=modified_entities,
262 )
263 self.store_data_and_prov(res_storer, prov_storer)
264 del creator_obj, creator, prov, res_storer, prov_storer, modified_entities
266 if progress is not None and batch_task_id is not None:
267 progress.update(batch_task_id, advance=1)
269 if progress is not None and batch_task_id is not None:
270 progress.remove_task(batch_task_id)
272 self.timer.record_metric("entities_created", total_entities)
273 self.timer.record_metric("modified_entities", total_modified)
275 return {"message": "success"}, cache_path, errors_path, filename
276 except Exception as e:
277 tb = traceback.format_exc()
278 template = (
279 "An exception of type {0} occurred. Arguments:\n{1!r}\nTraceback:\n{2}"
280 )
281 message = template.format(type(e).__name__, e.args, tb)
282 return {"message": message}, cache_path, errors_path, filename
284 def _setup_output_directories(self) -> None:
285 """Create output directories for data and provenance."""
286 os.makedirs(self.data_update_dir, exist_ok=True)
287 os.makedirs(self.prov_update_dir, exist_ok=True)
289 def _upload_sparql_queries(self) -> None:
290 """Upload SPARQL queries to triplestores in parallel."""
291 data_upload_folder = os.path.join(self.data_update_dir, "to_be_uploaded")
292 prov_upload_folder = os.path.join(self.prov_update_dir, "to_be_uploaded")
294 # Use forkserver to avoid deadlocks when forking from a multi-threaded process.
295 # Libraries like Redis and rdflib create background threads, and fork() would
296 # copy locked mutexes into the child process, causing hangs.
297 ctx = multiprocessing.get_context('forkserver')
299 data_process = ctx.Process(
300 target=_upload_to_triplestore,
301 args=(
302 self.triplestore_url,
303 data_upload_folder,
304 self.redis_host,
305 self.redis_port,
306 self.redis_cache_db,
307 self.ts_failed_queries,
308 self.ts_stop_file,
309 "Uploading data SPARQL"
310 )
311 )
313 prov_process = ctx.Process(
314 target=_upload_to_triplestore,
315 args=(
316 self.provenance_triplestore_url,
317 prov_upload_folder,
318 self.redis_host,
319 self.redis_port,
320 self.redis_cache_db,
321 self.ts_failed_queries,
322 self.ts_stop_file,
323 "Uploading prov SPARQL"
324 )
325 )
327 data_process.start()
328 prov_process.start()
330 data_process.join()
331 prov_process.join()
333 if data_process.exitcode != 0:
334 raise RuntimeError(f"Data upload failed with exit code {data_process.exitcode}")
335 if prov_process.exitcode != 0:
336 raise RuntimeError(f"Provenance upload failed with exit code {prov_process.exitcode}")
338 def store_data_and_prov(
339 self, res_storer: Storer, prov_storer: Storer
340 ) -> None:
341 """Orchestrate storage and upload."""
342 if not self.rdf_files_only:
343 self._setup_output_directories()
344 self._store_and_upload(res_storer, prov_storer, self.timer)
346 def _store_and_upload(
347 self, res_storer: Storer, prov_storer: Storer, timer: ProcessTimer
348 ) -> None:
349 """Store RDF files and upload queries to triplestore with parallel execution."""
350 with timer.timer("storage"):
351 # Use forkserver to avoid deadlocks when forking from a multi-threaded process.
352 # Libraries like Redis and rdflib create background threads, and fork() would
353 # copy locked mutexes into the child process, causing hangs.
354 ctx = multiprocessing.get_context('forkserver')
356 data_store_process = ctx.Process(
357 target=_store_rdf_worker,
358 args=(res_storer, self.output_rdf_dir, self.base_iri)
359 )
360 prov_store_process = ctx.Process(
361 target=_store_rdf_worker,
362 args=(prov_storer, self.output_rdf_dir, self.base_iri)
363 )
364 rdf_store_processes = [data_store_process, prov_store_process]
365 for p in rdf_store_processes:
366 p.start()
368 if not self.rdf_files_only:
369 data_query_process = ctx.Process(
370 target=_generate_queries_worker,
371 args=(res_storer, self.triplestore_url, self.data_update_dir)
372 )
373 prov_query_process = ctx.Process(
374 target=_generate_queries_worker,
375 args=(prov_storer, self.provenance_triplestore_url, self.prov_update_dir)
376 )
377 data_query_process.start()
378 prov_query_process.start()
379 data_query_process.join()
380 prov_query_process.join()
382 if data_query_process.exitcode != 0:
383 raise RuntimeError(f"Data query generation failed with exit code {data_query_process.exitcode}")
384 if prov_query_process.exitcode != 0:
385 raise RuntimeError(f"Prov query generation failed with exit code {prov_query_process.exitcode}")
387 self._upload_sparql_queries()
389 for p in rdf_store_processes:
390 p.join()
391 if p.exitcode != 0:
392 raise RuntimeError(f"RDF storage failed with exit code {p.exitcode}")
394 def run_sparql_updates(self, endpoint: str, folder: str):
395 upload_sparql_updates(
396 endpoint=endpoint,
397 folder=folder,
398 failed_file=self.ts_failed_queries,
399 stop_file=self.ts_stop_file,
400 redis_host=self.redis_host,
401 redis_port=self.redis_port,
402 redis_db=self.redis_cache_db,
403 )
406def _save_incremental_report(all_reports: List[Dict[str, Any]], meta_config_path: str, output_path: str) -> None:
407 """Save incremental timing report to JSON file."""
408 aggregate_report = {
409 "timestamp": datetime.now().isoformat(),
410 "config_path": meta_config_path,
411 "total_files_processed": len(all_reports),
412 "files": all_reports,
413 "aggregate": _compute_aggregate_metrics(all_reports)
414 }
415 with open(output_path, 'wb') as f:
416 f.write(orjson.dumps(aggregate_report, option=orjson.OPT_INDENT_2))
419def _get_file_peak_memory(report: Dict[str, Any]) -> float:
420 """Get peak memory (MB) across all phases in a file report."""
421 phases = report["report"]["phases"]
422 peaks = [p["peak_memory_mb"] for p in phases if p["peak_memory_mb"]]
423 return max(peaks) if peaks else 0
426def _compute_aggregate_metrics(all_reports: List[Dict[str, Any]]) -> Dict[str, Any]:
427 """Compute aggregate statistics across all file reports."""
428 if not all_reports:
429 return {}
431 total_duration = sum(r["report"]["metrics"].get("total_duration_seconds", 0) for r in all_reports)
432 total_records = sum(r["report"]["metrics"].get("input_records", 0) for r in all_reports)
433 total_entities = sum(r["report"]["metrics"].get("entities_created", 0) for r in all_reports)
435 durations = [r["report"]["metrics"].get("total_duration_seconds", 0) for r in all_reports]
436 throughputs = [r["report"]["metrics"].get("throughput_records_per_sec", 0) for r in all_reports]
438 file_peaks = [_get_file_peak_memory(r) for r in all_reports]
439 non_zero_peaks = [p for p in file_peaks if p]
441 result: Dict[str, Any] = {
442 "total_files": len(all_reports),
443 "total_duration_seconds": round(total_duration, 3),
444 "total_records_processed": total_records,
445 "total_entities_created": total_entities,
446 "average_time_per_file": round(total_duration / len(all_reports), 3) if all_reports else 0,
447 "average_throughput": round(sum(throughputs) / len(throughputs), 2) if throughputs else 0,
448 "min_time": round(min(durations), 3) if durations else 0,
449 "max_time": round(max(durations), 3) if durations else 0,
450 "overall_throughput": round(total_records / total_duration, 2) if total_duration > 0 else 0,
451 }
452 if non_zero_peaks:
453 result["peak_memory_mb"] = round(max(non_zero_peaks), 1)
454 result["average_peak_memory_mb"] = round(sum(non_zero_peaks) / len(non_zero_peaks), 1)
455 return result
458def _print_aggregate_summary(all_reports: List[Dict[str, Any]]) -> None:
459 """Print aggregate summary of all processed files."""
460 aggregate = _compute_aggregate_metrics(all_reports)
462 console.print(f"\n{'='*60}")
463 console.print("[bold]Aggregate Timing Summary[/bold]")
464 console.print(f"{'='*60}")
465 console.print(f"Total Files: {aggregate['total_files']}")
466 console.print(f"Total Duration: {aggregate['total_duration_seconds']}s")
467 console.print(f"Total Records: {aggregate['total_records_processed']}")
468 console.print(f"Total Entities: {aggregate['total_entities_created']}")
469 console.print(f"Average Time/File: {aggregate['average_time_per_file']}s")
470 console.print(f"Min/Max Time: {aggregate['min_time']}s / {aggregate['max_time']}s")
471 console.print(f"Overall Throughput: {aggregate['overall_throughput']} rec/s")
472 if "peak_memory_mb" in aggregate:
473 console.print(f"Peak Memory (RSS): {aggregate['peak_memory_mb']} MB")
474 console.print(f"Avg Peak Memory: {aggregate['average_peak_memory_mb']} MB")
475 console.print(f"{'='*60}\n")
478def run_meta_process(
479 settings: dict, meta_config_path: str, enable_timing: bool = False, timing_output: Optional[str] = None
480) -> None:
481 is_unix = platform in {"linux", "linux2", "darwin"}
482 all_reports = []
484 meta_process_setup = MetaProcess(settings=settings, meta_config_path=meta_config_path)
485 files_to_be_processed = meta_process_setup.prepare_folders()
487 generate_gentle_buttons(meta_process_setup.base_output_dir, meta_config_path, is_unix)
489 with create_progress() as progress:
490 task_id = progress.add_task("Processing files", total=len(files_to_be_processed))
491 for idx, filename in enumerate(files_to_be_processed, 1):
492 try:
493 if os.path.exists(os.path.join(meta_process_setup.base_output_dir, ".stop")):
494 console.print("\n[yellow]Stop file detected. Halting processing.[/yellow]")
495 break
497 if enable_timing:
498 console.print(f"\n[cyan][{idx}/{len(files_to_be_processed)}][/cyan] Processing {filename}...")
500 on_phase_cb = None
501 if enable_timing and timing_output:
502 _chart = timing_output.replace('.json', '_chart.png')
503 _reports, _fn, _cfg, _out = all_reports, filename, meta_config_path, timing_output
504 _include_storage = not settings.get("rdf_files_only", False)
505 def _on_phase(timer: ProcessTimer) -> None:
506 snapshot = _reports + [{"filename": _fn, "report": timer.get_report()}]
507 _save_incremental_report(snapshot, _cfg, _out)
508 plot_incremental_progress(snapshot, _chart, include_storage=_include_storage)
509 on_phase_cb = _on_phase
511 file_timer = ProcessTimer(enabled=enable_timing, verbose=enable_timing, on_phase_complete=on_phase_cb)
512 meta_process_setup.timer = file_timer
514 result = meta_process_setup.curate_and_create(
515 filename,
516 meta_process_setup.cache_path,
517 meta_process_setup.errors_path,
518 settings=settings,
519 meta_config_path=meta_config_path,
520 progress=progress,
521 )
522 task_done(result)
524 if enable_timing:
525 report = file_timer.get_report()
526 all_reports.append({
527 "filename": filename,
528 "report": report
529 })
530 file_timer.print_file_summary(filename)
532 except Exception as e:
533 traceback_str = traceback.format_exc()
534 console.print(
535 f"[red]Error processing file {filename}: {e}\nTraceback:\n{traceback_str}[/red]"
536 )
537 finally:
538 progress.advance(task_id)
540 if not os.path.exists(os.path.join(meta_process_setup.base_output_dir, ".stop")):
541 if os.path.exists(meta_process_setup.cache_path):
542 os.rename(
543 meta_process_setup.cache_path,
544 meta_process_setup.cache_path.replace(
545 ".txt", f'_{datetime.now().strftime("%Y-%m-%dT%H_%M_%S_%f")}.txt'
546 ),
547 )
548 if is_unix:
549 delete_lock_files(base_dir=meta_process_setup.base_output_dir)
551 if enable_timing and all_reports:
552 _print_aggregate_summary(all_reports)
553 if timing_output:
554 aggregate_report = {
555 "timestamp": datetime.now().isoformat(),
556 "config_path": meta_config_path,
557 "total_files": len(all_reports),
558 "files": all_reports,
559 "aggregate": _compute_aggregate_metrics(all_reports)
560 }
561 with open(timing_output, 'wb') as f:
562 f.write(orjson.dumps(aggregate_report, option=orjson.OPT_INDENT_2))
563 console.print(f"[green][Timing] Report saved to {timing_output}[/green]")
566def _cache_sort_key(filename: str) -> int:
567 return int(filename.replace(".csv", ""))
570def task_done(task_output: tuple) -> None:
571 message, cache_path, errors_path, filename = task_output
572 if message["message"] == "skip":
573 pass
574 elif message["message"] == "success":
575 if not os.path.exists(cache_path):
576 with open(cache_path, "w", encoding="utf-8") as aux_file:
577 aux_file.write(filename + "\n")
578 else:
579 with open(cache_path, "r", encoding="utf-8") as aux_file:
580 cache_data = aux_file.read().splitlines()
581 try:
582 bisect.insort(cache_data, filename, key=_cache_sort_key)
583 except ValueError:
584 # Non-numeric filename (e.g. "data.csv"): append without ordering
585 cache_data.append(filename)
586 with open(cache_path, "w", encoding="utf-8") as aux_file:
587 aux_file.write("\n".join(cache_data))
588 else:
589 with open(errors_path, "a", encoding="utf-8") as aux_file:
590 aux_file.write(f'{filename}: {message["message"]}' + "\n")
593def delete_lock_files(base_dir: str) -> None:
594 for dirpath, _, filenames in os.walk(base_dir):
595 for filename in filenames:
596 if filename.endswith(".lock"):
597 os.remove(os.path.join(dirpath, filename))
600def generate_gentle_buttons(dir: str, config: str, is_unix: bool):
601 if os.path.exists(os.path.join(dir, ".stop")):
602 os.remove(os.path.join(dir, ".stop"))
603 ext = "sh" if is_unix else "bat"
604 with open(f"gently_run.{ext}", "w") as rsh:
605 rsh.write(
606 f'{executable} -m oc_meta.lib.stopper -t "{dir}" --remove\n{executable} -m oc_meta.run.meta_process -c {config}'
607 )
608 with open(f"gently_stop.{ext}", "w") as rsh:
609 rsh.write(f'{executable} -m oc_meta.lib.stopper -t "{dir}" --add')
612if __name__ == "__main__": # pragma: no cover
613 arg_parser = ArgumentParser(
614 "meta_process.py",
615 description="This script runs the OCMeta data processing workflow",
616 formatter_class=RichHelpFormatter,
617 )
618 arg_parser.add_argument(
619 "-c",
620 "--config",
621 dest="config",
622 required=True,
623 help="Configuration file directory",
624 )
625 arg_parser.add_argument(
626 "--timing",
627 action="store_true",
628 help="Enable timing metrics collection and display summary at the end",
629 )
630 arg_parser.add_argument(
631 "--timing-output",
632 dest="timing_output",
633 default=None,
634 help="Optional path to save timing report as JSON file",
635 )
636 args = arg_parser.parse_args()
637 with open(args.config, encoding="utf-8") as file:
638 settings = yaml.full_load(file)
639 run_meta_process(
640 settings=settings,
641 meta_config_path=args.config,
642 enable_timing=args.timing,
643 timing_output=args.timing_output
644 )