Coverage for oc_ds_converter / lib / csvmanager.py: 50%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from csv import DictReader, writer 

6from io import StringIO 

7from os import mkdir, sep, walk 

8from os.path import exists, isdir, join 

9from typing import Dict 

10 

11 

12class CSVManager(object): 

13 ''' 

14 This class is able to load a simple CSV composed by two fields, 'id' and 

15 'value', and then to index all its items in a structured form so as to be 

16 easily queried. In addition, it allows one to store new information in the CSV, 

17 if needed. 

18 ''' 

19 def __init__(self, output_path: str | None = None, line_threshold: int = 10000, low_memory: bool = False): 

20 self.output_path = output_path 

21 self.data:Dict[str, set] = {} 

22 self.data_to_store = list() 

23 if output_path is not None: 

24 self.existing_files = self.__get_existing_files() 

25 if low_memory: 

26 self.__load_all_csv_files(self.existing_files, fun=self.__low_memory_load, line_threshold=line_threshold) 

27 else: 

28 self.__load_csv() 

29 

30 def __get_existing_files(self) -> list: 

31 files_to_process = [] 

32 if exists(self.output_path): 

33 for cur_dir, _, cur_files in walk(self.output_path): 

34 for cur_file in cur_files: 

35 if cur_file.endswith('.csv'): 

36 files_to_process.append(cur_dir + sep + cur_file) 

37 else: 

38 mkdir(self.output_path) 

39 return files_to_process 

40 

41 @staticmethod 

42 def load_csv_column_as_set(file_or_dir_path:str, key:str, line_threshold:int=10000): 

43 result = set() 

44 if exists(file_or_dir_path): 

45 file_to_process = [] 

46 if isdir(file_or_dir_path): 

47 for cur_dir, _, cur_files in walk(file_or_dir_path): 

48 for cur_file in cur_files: 

49 if cur_file.endswith('.csv'): 

50 file_to_process.append(cur_dir + sep + cur_file) 

51 else: 

52 file_to_process.append(file_or_dir_path) 

53 for item in CSVManager.__load_all_csv_files(file_to_process, CSVManager.__load_csv_by_key, line_threshold=line_threshold, key=key): 

54 result.update(item) 

55 return result 

56 

57 @staticmethod 

58 def __load_csv_by_key(csv_string, key): 

59 result = set() 

60 csv_metadata = DictReader(StringIO(csv_string), delimiter=',') 

61 for row in csv_metadata: 

62 result.add(row[key]) 

63 return result 

64 

65 @staticmethod 

66 def __load_all_csv_files(file_to_process, fun, line_threshold, **params): 

67 result = [] 

68 header = None 

69 for csv_path in file_to_process: 

70 with open(csv_path, encoding='utf-8') as f: 

71 csv_content = '' 

72 for idx, line in enumerate(f.readlines()): 

73 if header is None: 

74 header = line 

75 csv_content = header 

76 else: 

77 if idx % line_threshold == 0: 

78 result.append(fun(csv_content, **params)) 

79 csv_content = header 

80 csv_content += line 

81 result.append(fun(csv_content, **params)) 

82 return result 

83 

84 def dump_data(self, file_name:str) -> None: 

85 path = join(self.output_path, file_name) 

86 if not exists(path): 

87 with open(path, 'w', encoding='utf-8', newline='') as f: 

88 f.write('"id","value"\n') 

89 with open(path, 'a', encoding='utf-8', newline='') as f: 

90 csv_writer = writer(f, delimiter=',') 

91 for el in self.data_to_store: 

92 csv_writer.writerow([el[0].replace('"', '""'), el[1].replace('"', '""')]) 

93 self.data_to_store = list() 

94 

95 def get_value(self, id_string): 

96 ''' 

97 It returns the set of values associated to the input 'id_string', 

98 or None if 'id_string' is not included in the CSV. 

99 ''' 

100 if id_string in self.data: 

101 return set(self.data[id_string]) 

102 

103 def get_values_batch(self, ids: list[str]) -> dict[str, set[str]]: 

104 return {id_str: set(self.data[id_str]) for id_str in ids if id_str in self.data} 

105 

106 def add_value(self, id_string, value): 

107 ''' 

108 It adds the value specified in the set of values associated to 'id_string'. 

109 If the object was created with the option of storing also the data in a CSV 

110 ('store_new' = True, default behaviour), then it also add new data in the CSV. 

111 ''' 

112 self.data.setdefault(id_string, set()) 

113 if value not in self.data[id_string]: 

114 self.data[id_string].add(value) 

115 self.data_to_store.append([id_string, value]) 

116 

117 def __load_csv(self): 

118 for file in self.existing_files: 

119 with open(file, 'r', encoding='utf-8') as f: 

120 reader = DictReader(f) 

121 for row in reader: 

122 self.data.setdefault(row['id'], set()) 

123 self.data[row['id']].add(row['value']) 

124 

125 def __low_memory_load(self, csv_string:str): 

126 csv_metadata = DictReader(StringIO(csv_string), delimiter=',') 

127 for row in csv_metadata: 

128 cur_id = row['id'] 

129 if cur_id not in self.data: 

130 self.data[cur_id] = set() 

131 self.data[cur_id].add(row['value'])