Coverage for oc_meta / run / meta / merge_csv.py: 100%
46 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7from __future__ import annotations
9import argparse
10import os
11from typing import List
13from rich.table import Table
14from rich_argparse import RichHelpFormatter
16from oc_meta.lib.console import console, create_progress
17from oc_meta.lib.file_manager import get_csv_data, write_csv
20def get_csv_files(directory: str) -> List[str]:
21 if not os.path.isdir(directory):
22 raise ValueError(
23 "The specified path '{}' is not a directory".format(directory)
24 )
25 return [
26 os.path.join(directory, f)
27 for f in os.listdir(directory)
28 if f.endswith(".csv")
29 ]
32def resolve_output_path(output: str) -> str:
33 if output.endswith(".csv"):
34 parent_dir = os.path.dirname(output)
35 if parent_dir:
36 os.makedirs(parent_dir, exist_ok=True)
37 return output
38 os.makedirs(output, exist_ok=True)
39 return os.path.join(output, "merged.csv")
42def merge_csv_files(
43 input_dir: str, output_path: str
44) -> tuple[int, int, List[str]]:
45 csv_files = get_csv_files(input_dir)
46 if not csv_files:
47 return 0, 0, []
49 all_rows: List[dict[str, str]] = []
50 fieldnames: List[str] = []
52 with create_progress() as progress:
53 task = progress.add_task("Merging CSV files", total=len(csv_files))
55 for csv_file in csv_files:
56 data = get_csv_data(csv_file, clean_data=False)
57 if data:
58 if not fieldnames:
59 fieldnames = list(data[0].keys())
60 all_rows.extend(data)
61 progress.advance(task)
63 if all_rows:
64 write_csv(output_path, all_rows, fieldnames=fieldnames)
66 return len(all_rows), len(csv_files), fieldnames
69def print_merge_report(
70 total_rows: int, files_processed: int, output_path: str
71) -> None:
72 table = Table(title="Merge report")
73 table.add_column("Metric", style="cyan")
74 table.add_column("Value", style="green")
76 table.add_row("Files merged", str(files_processed))
77 table.add_row("Total rows", str(total_rows))
78 table.add_row("Output file", output_path)
80 console.print(table)
83def main() -> int: # pragma: no cover
84 parser = argparse.ArgumentParser(
85 description="Merge multiple CSV files into a single file",
86 formatter_class=RichHelpFormatter,
87 )
88 parser.add_argument("input_dir", help="Directory containing input CSV files")
89 parser.add_argument(
90 "output",
91 help="Output path: if ends with .csv, creates that file; otherwise, creates merged.csv in that folder",
92 )
93 args = parser.parse_args()
95 if not os.path.isdir(args.input_dir):
96 console.print(
97 "[red]Input directory does not exist: {}[/red]".format(args.input_dir)
98 )
99 return 1
101 output_path = resolve_output_path(args.output)
103 csv_files = get_csv_files(args.input_dir)
104 if not csv_files:
105 console.print(
106 "[red]No CSV files found in directory: {}[/red]".format(args.input_dir)
107 )
108 return 1
110 console.print(
111 "Found [green]{}[/green] CSV files to merge".format(len(csv_files))
112 )
114 total_rows, files_processed, _ = merge_csv_files(args.input_dir, output_path)
116 print_merge_report(total_rows, files_processed, output_path)
118 return 0
121if __name__ == "__main__": # pragma: no cover
122 main()