Coverage for oc_meta / run / meta / merge_csv.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7from __future__ import annotations 

8 

9import argparse 

10import os 

11from typing import List 

12 

13from rich.table import Table 

14from rich_argparse import RichHelpFormatter 

15 

16from oc_meta.lib.console import console, create_progress 

17from oc_meta.lib.file_manager import get_csv_data, write_csv 

18 

19 

20def get_csv_files(directory: str) -> List[str]: 

21 if not os.path.isdir(directory): 

22 raise ValueError( 

23 "The specified path '{}' is not a directory".format(directory) 

24 ) 

25 return [ 

26 os.path.join(directory, f) 

27 for f in os.listdir(directory) 

28 if f.endswith(".csv") 

29 ] 

30 

31 

32def resolve_output_path(output: str) -> str: 

33 if output.endswith(".csv"): 

34 parent_dir = os.path.dirname(output) 

35 if parent_dir: 

36 os.makedirs(parent_dir, exist_ok=True) 

37 return output 

38 os.makedirs(output, exist_ok=True) 

39 return os.path.join(output, "merged.csv") 

40 

41 

42def merge_csv_files( 

43 input_dir: str, output_path: str 

44) -> tuple[int, int, List[str]]: 

45 csv_files = get_csv_files(input_dir) 

46 if not csv_files: 

47 return 0, 0, [] 

48 

49 all_rows: List[dict[str, str]] = [] 

50 fieldnames: List[str] = [] 

51 

52 with create_progress() as progress: 

53 task = progress.add_task("Merging CSV files", total=len(csv_files)) 

54 

55 for csv_file in csv_files: 

56 data = get_csv_data(csv_file, clean_data=False) 

57 if data: 

58 if not fieldnames: 

59 fieldnames = list(data[0].keys()) 

60 all_rows.extend(data) 

61 progress.advance(task) 

62 

63 if all_rows: 

64 write_csv(output_path, all_rows, fieldnames=fieldnames) 

65 

66 return len(all_rows), len(csv_files), fieldnames 

67 

68 

69def print_merge_report( 

70 total_rows: int, files_processed: int, output_path: str 

71) -> None: 

72 table = Table(title="Merge report") 

73 table.add_column("Metric", style="cyan") 

74 table.add_column("Value", style="green") 

75 

76 table.add_row("Files merged", str(files_processed)) 

77 table.add_row("Total rows", str(total_rows)) 

78 table.add_row("Output file", output_path) 

79 

80 console.print(table) 

81 

82 

83def main() -> int: # pragma: no cover 

84 parser = argparse.ArgumentParser( 

85 description="Merge multiple CSV files into a single file", 

86 formatter_class=RichHelpFormatter, 

87 ) 

88 parser.add_argument("input_dir", help="Directory containing input CSV files") 

89 parser.add_argument( 

90 "output", 

91 help="Output path: if ends with .csv, creates that file; otherwise, creates merged.csv in that folder", 

92 ) 

93 args = parser.parse_args() 

94 

95 if not os.path.isdir(args.input_dir): 

96 console.print( 

97 "[red]Input directory does not exist: {}[/red]".format(args.input_dir) 

98 ) 

99 return 1 

100 

101 output_path = resolve_output_path(args.output) 

102 

103 csv_files = get_csv_files(args.input_dir) 

104 if not csv_files: 

105 console.print( 

106 "[red]No CSV files found in directory: {}[/red]".format(args.input_dir) 

107 ) 

108 return 1 

109 

110 console.print( 

111 "Found [green]{}[/green] CSV files to merge".format(len(csv_files)) 

112 ) 

113 

114 total_rows, files_processed, _ = merge_csv_files(args.input_dir, output_path) 

115 

116 print_merge_report(total_rows, files_processed, output_path) 

117 

118 return 0 

119 

120 

121if __name__ == "__main__": # pragma: no cover 

122 main()