Coverage for oc_meta / run / meta_process.py: 77%
305 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright 2019 Silvio Peroni <essepuntato@gmail.com>
4# Copyright 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
5# Copyright 2021 Simone Persiani <iosonopersia@gmail.com>
6# Copyright 2021-2022 Arcangelo Massari <arcangelo.massari@unibo.it>
7#
8# Permission to use, copy, modify, and/or distribute this software for any purpose
9# with or without fee is hereby granted, provided that the above copyright notice
10# and this permission notice appear in all copies.
11#
12# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
13# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
14# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
15# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
16# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
18# SOFTWARE.
20from __future__ import annotations
22import csv
23import glob
24import json
25import multiprocessing
26import os
27import sys
28import traceback
29from argparse import ArgumentParser
30from datetime import datetime
31from sys import executable, platform
32from typing import Any, Dict, Iterator, List, Optional, Tuple
34import redis
35import yaml
36from oc_ocdm import Storer
37from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
38from oc_ocdm.prov import ProvSet
39from oc_ocdm.support.reporter import Reporter
40from piccione.upload.on_triplestore import upload_sparql_updates
41from time_agnostic_library.support import generate_config_file
42from tqdm import tqdm
43from virtuoso_utilities.bulk_load import bulk_load
45from oc_meta.core.creator import Creator
46from oc_meta.core.curator import Curator
47from oc_meta.lib.file_manager import (get_csv_data, init_cache, normalize_path,
48 pathoo, sort_files)
49from oc_meta.lib.timer import ProcessTimer
50from oc_meta.run.benchmark.plotting import plot_incremental_progress
53class BulkLoadError(Exception):
54 """Raised when Virtuoso bulk load fails."""
55 pass
58def _upload_to_triplestore(endpoint: str, folder: str, redis_host: str, redis_port: int, redis_db: int, failed_file: str, stop_file: str, description: str = "Processing files") -> None:
59 """Upload SPARQL queries from folder to triplestore endpoint."""
60 try:
61 upload_sparql_updates(
62 endpoint=endpoint,
63 folder=folder,
64 failed_file=failed_file,
65 stop_file=stop_file,
66 redis_host=redis_host,
67 redis_port=redis_port,
68 redis_db=redis_db,
69 description=description,
70 show_progress=False,
71 )
72 except Exception as e:
73 print(f"Upload to {endpoint} failed: {e}", file=sys.stderr)
74 sys.exit(1)
77def _run_bulk_load_process(args: tuple) -> None:
78 """
79 Worker function to execute Virtuoso bulk load in a separate process.
80 """
81 container_name, bulk_load_dir, nquads_host_dir = args
83 nquads_files = glob.glob(os.path.join(nquads_host_dir, "*.nq.gz"))
84 if not nquads_files:
85 return
87 virtuoso_password = "dba"
89 try:
90 bulk_load(
91 data_directory=nquads_host_dir,
92 password=virtuoso_password,
93 docker_container=container_name,
94 container_data_directory=bulk_load_dir,
95 log_level="CRITICAL"
96 )
97 except RuntimeError as e:
98 raise BulkLoadError(f"Bulk load failed for container {container_name}: {e}") from e
101def _generate_queries_worker(storer: Storer, triplestore_url, base_dir, save_queries, prepare_bulk_load, bulk_load_dir):
102 storer.upload_all(
103 triplestore_url=triplestore_url,
104 base_dir=base_dir,
105 batch_size=10,
106 save_queries=save_queries,
107 prepare_bulk_load=prepare_bulk_load,
108 bulk_load_dir=bulk_load_dir
109 )
112def _store_rdf_worker(storer: Storer, base_dir, base_iri, context_path):
113 storer.store_all(
114 base_dir=base_dir,
115 base_iri=base_iri,
116 context_path=context_path
117 )
120class MetaProcess:
121 def __init__(self, settings: dict, meta_config_path: str, timer: Optional[ProcessTimer] = None):
122 self.settings = settings
123 # Mandatory settings
124 self.triplestore_url = settings["triplestore_url"] # Main triplestore for data
125 self.provenance_triplestore_url = settings["provenance_triplestore_url"] # Separate triplestore for provenance
126 self.input_csv_dir = normalize_path(settings["input_csv_dir"])
127 self.base_output_dir = normalize_path(settings["base_output_dir"])
128 self.resp_agent = settings["resp_agent"]
129 self.info_dir = os.path.join(self.base_output_dir, "info_dir")
130 self.output_csv_dir = os.path.join(self.base_output_dir, "csv")
131 self.output_rdf_dir = (
132 normalize_path(settings["output_rdf_dir"]) + os.sep + "rdf" + os.sep
133 )
134 self.cache_path = os.path.join(self.base_output_dir, "cache.txt")
135 self.errors_path = os.path.join(self.base_output_dir, "errors.txt")
136 self.timer = timer or ProcessTimer(enabled=False)
137 # Optional settings
138 self.base_iri = settings["base_iri"]
139 self.normalize_titles = settings.get("normalize_titles", True)
140 self.context_path = settings["context_path"]
141 self.dir_split_number = settings["dir_split_number"]
142 self.items_per_file = settings["items_per_file"]
143 self.default_dir = settings["default_dir"]
144 self.zip_output_rdf = settings["zip_output_rdf"]
145 self.source = settings["source"]
146 self.valid_dois_cache = (
147 dict() if bool(settings["use_doi_api_service"]) == True else None
148 )
149 supplier_prefix: str = settings["supplier_prefix"]
150 self.supplier_prefix = (
151 supplier_prefix if supplier_prefix.endswith("0") else f"{supplier_prefix}0"
152 )
153 self.silencer = settings["silencer"]
154 self.generate_rdf_files = settings.get("generate_rdf_files", True)
155 # Time-Agnostic_library integration
156 self.time_agnostic_library_config = os.path.join(
157 os.path.dirname(meta_config_path), "time_agnostic_library_config.json"
158 )
159 if not os.path.exists(self.time_agnostic_library_config):
160 generate_config_file(
161 config_path=self.time_agnostic_library_config,
162 dataset_urls=[self.triplestore_url],
163 dataset_dirs=list(),
164 provenance_urls=[self.provenance_triplestore_url] if self.provenance_triplestore_url not in settings["provenance_endpoints"] else settings["provenance_endpoints"],
165 provenance_dirs=list(),
166 blazegraph_full_text_search=settings["blazegraph_full_text_search"],
167 fuseki_full_text_search=settings["fuseki_full_text_search"],
168 virtuoso_full_text_search=settings["virtuoso_full_text_search"],
169 graphdb_connector_name=settings["graphdb_connector_name"],
170 )
172 # Redis settings
173 self.redis_host = settings.get("redis_host", "localhost")
174 self.redis_port = settings.get("redis_port", 6379)
175 self.redis_db = settings.get("redis_db", 5)
176 self.redis_cache_db = settings.get("redis_cache_db", 2)
177 self.redis_client = redis.Redis(
178 host=self.redis_host, port=self.redis_port, db=self.redis_db
179 )
181 self.counter_handler = RedisCounterHandler(
182 host=self.redis_host, port=self.redis_port, db=self.redis_db
183 )
185 # Triplestore upload settings
186 self.ts_failed_queries = settings.get("ts_failed_queries", "failed_queries.txt")
187 self.ts_stop_file = settings.get("ts_stop_file", ".stop_upload")
189 self.data_update_dir = os.path.join(self.base_output_dir, "to_be_uploaded_data")
190 self.prov_update_dir = os.path.join(self.base_output_dir, "to_be_uploaded_prov")
192 def prepare_folders(self) -> List[str]:
193 completed = init_cache(self.cache_path)
194 files_in_input_csv_dir = {
195 filename
196 for filename in os.listdir(self.input_csv_dir)
197 if filename.endswith(".csv")
198 }
199 files_to_be_processed = sort_files(
200 list(files_in_input_csv_dir.difference(completed))
201 )
202 pathoo(self.output_csv_dir)
203 csv.field_size_limit(128)
204 return files_to_be_processed
206 def curate_and_create(
207 self,
208 filename: str,
209 cache_path: str,
210 errors_path: str,
211 settings: dict | None = None,
212 meta_config_path: str | None = None,
213 ) -> Tuple[dict, str, str, str]:
214 try:
215 with self.timer.timer("total_processing"):
216 filepath = os.path.join(self.input_csv_dir, filename)
217 print(filepath)
218 data = get_csv_data(filepath)
219 self.timer.record_metric("input_records", len(data))
221 self.info_dir = os.path.join(self.info_dir, self.supplier_prefix)
222 curator_obj = Curator(
223 data=data,
224 ts=self.triplestore_url,
225 prov_config=self.time_agnostic_library_config,
226 counter_handler=self.counter_handler,
227 base_iri=self.base_iri,
228 prefix=self.supplier_prefix,
229 valid_dois_cache=self.valid_dois_cache,
230 settings=settings,
231 silencer=self.silencer,
232 meta_config_path=meta_config_path,
233 timer=self.timer,
234 )
235 name = f"{filename.replace('.csv', '')}_{datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}"
236 curator_obj.curator(
237 filename=name, path_csv=self.output_csv_dir
238 )
239 self.timer.record_metric("curated_records", len(curator_obj.data))
241 with self.timer.timer("rdf_creation"):
242 local_g_size = len(curator_obj.everything_everywhere_allatonce)
243 self.timer.record_metric("local_g_triples", local_g_size)
244 preexisting_count = len(curator_obj.preexisting_entities)
245 self.timer.record_metric("preexisting_entities_count", preexisting_count)
247 with self.timer.timer("creator_execution"):
248 creator_obj = Creator(
249 data=curator_obj.data,
250 finder=curator_obj.finder,
251 base_iri=self.base_iri,
252 counter_handler=self.counter_handler,
253 supplier_prefix=self.supplier_prefix,
254 resp_agent=self.resp_agent,
255 ra_index=curator_obj.index_id_ra,
256 br_index=curator_obj.index_id_br,
257 re_index_csv=curator_obj.re_index,
258 ar_index_csv=curator_obj.ar_index,
259 vi_index=curator_obj.VolIss,
260 silencer=settings.get("silencer", []),
261 )
262 creator = creator_obj.creator(source=self.source)
263 self.timer.record_metric("entities_created", len(creator.res_to_entity))
265 with self.timer.timer("provenance_generation"):
266 prov = ProvSet(
267 creator,
268 self.base_iri,
269 wanted_label=False,
270 supplier_prefix=self.supplier_prefix,
271 custom_counter_handler=self.counter_handler,
272 )
273 modified_entities = prov.generate_provenance()
274 self.timer.record_metric("modified_entities", len(modified_entities))
276 repok = Reporter(print_sentences=False)
277 reperr = Reporter(print_sentences=True, prefix="[Storer: ERROR] ")
278 res_storer = Storer(
279 abstract_set=creator,
280 repok=repok,
281 reperr=reperr,
282 context_map={},
283 dir_split=self.dir_split_number,
284 n_file_item=self.items_per_file,
285 default_dir=self.default_dir,
286 output_format="json-ld",
287 zip_output=self.zip_output_rdf,
288 modified_entities=modified_entities,
289 )
290 prov_storer = Storer(
291 abstract_set=prov,
292 repok=repok,
293 reperr=reperr,
294 context_map={},
295 dir_split=self.dir_split_number,
296 n_file_item=self.items_per_file,
297 output_format="json-ld",
298 zip_output=self.zip_output_rdf,
299 modified_entities=modified_entities,
300 )
301 self.store_data_and_prov(res_storer, prov_storer)
303 return {"message": "success"}, cache_path, errors_path, filename
304 except Exception as e:
305 tb = traceback.format_exc()
306 template = (
307 "An exception of type {0} occurred. Arguments:\n{1!r}\nTraceback:\n{2}"
308 )
309 message = template.format(type(e).__name__, e.args, tb)
310 if isinstance(e, BulkLoadError):
311 print(message)
312 raise
313 return {"message": message}, cache_path, errors_path, filename
315 def _setup_output_directories(self) -> None:
316 """Create output directories for data and provenance."""
317 os.makedirs(self.data_update_dir, exist_ok=True)
318 os.makedirs(self.prov_update_dir, exist_ok=True)
320 def _upload_sparql_queries(self) -> None:
321 """Upload SPARQL queries to triplestores in parallel using fork."""
322 data_upload_folder = os.path.join(self.data_update_dir, "to_be_uploaded")
323 prov_upload_folder = os.path.join(self.prov_update_dir, "to_be_uploaded")
325 ctx = multiprocessing.get_context('fork')
327 data_process = ctx.Process(
328 target=_upload_to_triplestore,
329 args=(
330 self.triplestore_url,
331 data_upload_folder,
332 self.redis_host,
333 self.redis_port,
334 self.redis_cache_db,
335 self.ts_failed_queries,
336 self.ts_stop_file,
337 "Uploading data SPARQL"
338 )
339 )
341 prov_process = ctx.Process(
342 target=_upload_to_triplestore,
343 args=(
344 self.provenance_triplestore_url,
345 prov_upload_folder,
346 self.redis_host,
347 self.redis_port,
348 self.redis_cache_db,
349 self.ts_failed_queries,
350 self.ts_stop_file,
351 "Uploading prov SPARQL"
352 )
353 )
355 data_process.start()
356 prov_process.start()
358 data_process.join()
359 prov_process.join()
361 if data_process.exitcode != 0:
362 raise RuntimeError(f"Data upload failed with exit code {data_process.exitcode}")
363 if prov_process.exitcode != 0:
364 raise RuntimeError(f"Provenance upload failed with exit code {prov_process.exitcode}")
366 def store_data_and_prov(
367 self, res_storer: Storer, prov_storer: Storer
368 ) -> None:
369 """Orchestrate storage and upload using appropriate strategy."""
370 self._setup_output_directories()
372 bulk_config = self.settings.get("virtuoso_bulk_load", {})
373 if bulk_config.get("enabled", False):
374 self._store_and_upload(res_storer, prov_storer, self.timer, bulk_config)
375 else:
376 self._store_and_upload(res_storer, prov_storer, self.timer)
378 def _store_and_upload(
379 self, res_storer: Storer, prov_storer: Storer, timer: ProcessTimer, bulk_config: dict = None
380 ) -> None:
381 """Store RDF files and upload queries to triplestore with parallel execution."""
382 use_bulk_load = bulk_config is not None
384 if use_bulk_load:
385 data_nquads_dir = os.path.abspath(bulk_config["data_mount_dir"])
386 prov_nquads_dir = os.path.abspath(bulk_config["prov_mount_dir"])
387 os.makedirs(data_nquads_dir, exist_ok=True)
388 os.makedirs(prov_nquads_dir, exist_ok=True)
389 else:
390 data_nquads_dir = None
391 prov_nquads_dir = None
393 with timer.timer("storage"):
394 ctx = multiprocessing.get_context('fork')
395 rdf_store_processes = []
397 if self.generate_rdf_files:
398 data_store_process = ctx.Process(
399 target=_store_rdf_worker,
400 args=(res_storer, self.output_rdf_dir, self.base_iri, self.context_path)
401 )
402 prov_store_process = ctx.Process(
403 target=_store_rdf_worker,
404 args=(prov_storer, self.output_rdf_dir, self.base_iri, self.context_path)
405 )
406 rdf_store_processes = [data_store_process, prov_store_process]
407 for p in rdf_store_processes:
408 p.start()
410 data_query_process = ctx.Process(
411 target=_generate_queries_worker,
412 args=(res_storer, self.triplestore_url, self.data_update_dir,
413 not use_bulk_load, use_bulk_load, data_nquads_dir)
414 )
415 prov_query_process = ctx.Process(
416 target=_generate_queries_worker,
417 args=(prov_storer, self.provenance_triplestore_url, self.prov_update_dir,
418 not use_bulk_load, use_bulk_load, prov_nquads_dir)
419 )
420 data_query_process.start()
421 prov_query_process.start()
422 data_query_process.join()
423 prov_query_process.join()
425 if data_query_process.exitcode != 0:
426 raise RuntimeError(f"Data query generation failed with exit code {data_query_process.exitcode}")
427 if prov_query_process.exitcode != 0:
428 raise RuntimeError(f"Prov query generation failed with exit code {prov_query_process.exitcode}")
430 self._upload_sparql_queries()
432 if use_bulk_load:
433 _run_bulk_load_process((bulk_config["data_container"], bulk_config["bulk_load_dir"], data_nquads_dir))
434 _run_bulk_load_process((bulk_config["prov_container"], bulk_config["bulk_load_dir"], prov_nquads_dir))
435 for nq_file in glob.glob(os.path.join(data_nquads_dir, "*.nq.gz")):
436 os.remove(nq_file)
437 for nq_file in glob.glob(os.path.join(prov_nquads_dir, "*.nq.gz")):
438 os.remove(nq_file)
440 for p in rdf_store_processes:
441 p.join()
442 if p.exitcode != 0:
443 raise RuntimeError(f"RDF storage failed with exit code {p.exitcode}")
445 def run_sparql_updates(self, endpoint: str, folder: str):
446 upload_sparql_updates(
447 endpoint=endpoint,
448 folder=folder,
449 failed_file=self.ts_failed_queries,
450 stop_file=self.ts_stop_file,
451 redis_host=self.redis_host,
452 redis_port=self.redis_port,
453 redis_db=self.redis_cache_db,
454 )
457def _save_incremental_report(all_reports: List[Dict[str, Any]], meta_config_path: str, output_path: str) -> None:
458 """Save incremental timing report to JSON file."""
459 aggregate_report = {
460 "timestamp": datetime.now().isoformat(),
461 "config_path": meta_config_path,
462 "total_files_processed": len(all_reports),
463 "files": all_reports,
464 "aggregate": _compute_aggregate_metrics(all_reports)
465 }
466 with open(output_path, 'w') as f:
467 json.dump(aggregate_report, f, indent=2)
470def _compute_aggregate_metrics(all_reports: List[Dict[str, Any]]) -> Dict[str, Any]:
471 """Compute aggregate statistics across all file reports."""
472 if not all_reports:
473 return {}
475 total_duration = sum(r["report"]["metrics"].get("total_duration_seconds", 0) for r in all_reports)
476 total_records = sum(r["report"]["metrics"].get("input_records", 0) for r in all_reports)
477 total_entities = sum(r["report"]["metrics"].get("entities_created", 0) for r in all_reports)
479 durations = [r["report"]["metrics"].get("total_duration_seconds", 0) for r in all_reports]
480 throughputs = [r["report"]["metrics"].get("throughput_records_per_sec", 0) for r in all_reports]
482 return {
483 "total_files": len(all_reports),
484 "total_duration_seconds": round(total_duration, 3),
485 "total_records_processed": total_records,
486 "total_entities_created": total_entities,
487 "average_time_per_file": round(total_duration / len(all_reports), 3) if all_reports else 0,
488 "average_throughput": round(sum(throughputs) / len(throughputs), 2) if throughputs else 0,
489 "min_time": round(min(durations), 3) if durations else 0,
490 "max_time": round(max(durations), 3) if durations else 0,
491 "overall_throughput": round(total_records / total_duration, 2) if total_duration > 0 else 0
492 }
495def _print_aggregate_summary(all_reports: List[Dict[str, Any]]) -> None:
496 """Print aggregate summary of all processed files."""
497 aggregate = _compute_aggregate_metrics(all_reports)
499 print(f"\n{'='*60}")
500 print("Aggregate Timing Summary")
501 print(f"{'='*60}")
502 print(f"Total Files: {aggregate['total_files']}")
503 print(f"Total Duration: {aggregate['total_duration_seconds']}s")
504 print(f"Total Records: {aggregate['total_records_processed']}")
505 print(f"Total Entities: {aggregate['total_entities_created']}")
506 print(f"Average Time/File: {aggregate['average_time_per_file']}s")
507 print(f"Min/Max Time: {aggregate['min_time']}s / {aggregate['max_time']}s")
508 print(f"Overall Throughput: {aggregate['overall_throughput']} rec/s")
509 print(f"{'='*60}\n")
512def run_meta_process(
513 settings: dict, meta_config_path: str, enable_timing: bool = False, timing_output: Optional[str] = None
514) -> None:
515 is_unix = platform in {"linux", "linux2", "darwin"}
516 all_reports = []
518 meta_process_setup = MetaProcess(settings=settings, meta_config_path=meta_config_path)
519 files_to_be_processed = meta_process_setup.prepare_folders()
521 generate_gentle_buttons(meta_process_setup.base_output_dir, meta_config_path, is_unix)
523 with tqdm(total=len(files_to_be_processed), desc="Processing files") as progress_bar:
524 for idx, filename in enumerate(files_to_be_processed, 1):
525 try:
526 if os.path.exists(os.path.join(meta_process_setup.base_output_dir, ".stop")):
527 print("\nStop file detected. Halting processing.")
528 break
530 if enable_timing:
531 print(f"\n[{idx}/{len(files_to_be_processed)}] Processing {filename}...")
533 file_timer = ProcessTimer(enabled=enable_timing, verbose=enable_timing)
534 meta_process = MetaProcess(settings=settings, meta_config_path=meta_config_path, timer=file_timer)
536 result = meta_process.curate_and_create(
537 filename,
538 meta_process.cache_path,
539 meta_process.errors_path,
540 settings=settings,
541 meta_config_path=meta_config_path
542 )
543 task_done(result)
545 if enable_timing:
546 report = file_timer.get_report()
547 all_reports.append({
548 "filename": filename,
549 "report": report
550 })
552 file_timer.print_file_summary(filename)
554 if timing_output:
555 _save_incremental_report(all_reports, meta_config_path, timing_output)
556 print(f"\n JSON updated: {timing_output}")
558 chart_file = timing_output.replace('.json', '_chart.png') if timing_output else 'meta_process_timing_chart.png'
559 plot_incremental_progress(all_reports, chart_file)
560 print(f" Chart updated: {chart_file}\n")
562 except Exception as e:
563 traceback_str = traceback.format_exc()
564 print(
565 f"Error processing file {filename}: {e}\nTraceback:\n{traceback_str}"
566 )
567 finally:
568 progress_bar.update(1)
570 if not os.path.exists(os.path.join(meta_process_setup.base_output_dir, ".stop")):
571 if os.path.exists(meta_process_setup.cache_path):
572 os.rename(
573 meta_process_setup.cache_path,
574 meta_process_setup.cache_path.replace(
575 ".txt", f'_{datetime.now().strftime("%Y-%m-%dT%H_%M_%S_%f")}.txt'
576 ),
577 )
578 if is_unix:
579 delete_lock_files(base_dir=meta_process_setup.base_output_dir)
581 if enable_timing and all_reports:
582 _print_aggregate_summary(all_reports)
583 if timing_output:
584 aggregate_report = {
585 "timestamp": datetime.now().isoformat(),
586 "config_path": meta_config_path,
587 "total_files": len(all_reports),
588 "files": all_reports,
589 "aggregate": _compute_aggregate_metrics(all_reports)
590 }
591 with open(timing_output, 'w') as f:
592 json.dump(aggregate_report, f, indent=2)
593 print(f"[Timing] Report saved to {timing_output}")
596def task_done(task_output: tuple) -> None:
597 message, cache_path, errors_path, filename = task_output
598 if message["message"] == "skip":
599 pass
600 elif message["message"] == "success":
601 if not os.path.exists(cache_path):
602 with open(cache_path, "w", encoding="utf-8") as aux_file:
603 aux_file.write(filename + "\n")
604 else:
605 with open(cache_path, "r", encoding="utf-8") as aux_file:
606 cache_data = aux_file.read().splitlines()
607 cache_data.append(filename)
608 try:
609 data_sorted = sorted(
610 cache_data,
611 key=lambda filename: int(filename.replace(".csv", "")),
612 reverse=False,
613 )
614 except ValueError:
615 data_sorted = cache_data
616 with open(cache_path, "w", encoding="utf-8") as aux_file:
617 aux_file.write("\n".join(data_sorted))
618 else:
619 with open(errors_path, "a", encoding="utf-8") as aux_file:
620 aux_file.write(f'{filename}: {message["message"]}' + "\n")
623def chunks(lst: list, n: int) -> Iterator[list]:
624 """Yield successive n-sized chunks from lst."""
625 for i in range(0, len(lst), n):
626 yield lst[i : i + n]
629def delete_lock_files(base_dir: list) -> None:
630 for dirpath, _, filenames in os.walk(base_dir):
631 for filename in filenames:
632 if filename.endswith(".lock"):
633 os.remove(os.path.join(dirpath, filename))
636def generate_gentle_buttons(dir: str, config: str, is_unix: bool):
637 if os.path.exists(os.path.join(dir, ".stop")):
638 os.remove(os.path.join(dir, ".stop"))
639 ext = "sh" if is_unix else "bat"
640 with open(f"gently_run.{ext}", "w") as rsh:
641 rsh.write(
642 f'{executable} -m oc_meta.lib.stopper -t "{dir}" --remove\n{executable} -m oc_meta.run.meta_process -c {config}'
643 )
644 with open(f"gently_stop.{ext}", "w") as rsh:
645 rsh.write(f'{executable} -m oc_meta.lib.stopper -t "{dir}" --add')
648if __name__ == "__main__": # pragma: no cover
649 arg_parser = ArgumentParser(
650 "meta_process.py",
651 description="This script runs the OCMeta data processing workflow",
652 )
653 arg_parser.add_argument(
654 "-c",
655 "--config",
656 dest="config",
657 required=True,
658 help="Configuration file directory",
659 )
660 arg_parser.add_argument(
661 "--timing",
662 action="store_true",
663 help="Enable timing metrics collection and display summary at the end",
664 )
665 arg_parser.add_argument(
666 "--timing-output",
667 dest="timing_output",
668 default=None,
669 help="Optional path to save timing report as JSON file",
670 )
671 args = arg_parser.parse_args()
672 with open(args.config, encoding="utf-8") as file:
673 settings = yaml.full_load(file)
674 run_meta_process(
675 settings=settings,
676 meta_config_path=args.config,
677 enable_timing=args.timing,
678 timing_output=args.timing_output
679 )