Coverage for oc_ds_converter / lib / csvmanager.py: 50%
96 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from csv import DictReader, writer
6from io import StringIO
7from os import mkdir, sep, walk
8from os.path import exists, isdir, join
9from typing import Dict
12class CSVManager(object):
13 '''
14 This class is able to load a simple CSV composed by two fields, 'id' and
15 'value', and then to index all its items in a structured form so as to be
16 easily queried. In addition, it allows one to store new information in the CSV,
17 if needed.
18 '''
19 def __init__(self, output_path: str | None = None, line_threshold: int = 10000, low_memory: bool = False):
20 self.output_path = output_path
21 self.data:Dict[str, set] = {}
22 self.data_to_store = list()
23 if output_path is not None:
24 self.existing_files = self.__get_existing_files()
25 if low_memory:
26 self.__load_all_csv_files(self.existing_files, fun=self.__low_memory_load, line_threshold=line_threshold)
27 else:
28 self.__load_csv()
30 def __get_existing_files(self) -> list:
31 files_to_process = []
32 if exists(self.output_path):
33 for cur_dir, _, cur_files in walk(self.output_path):
34 for cur_file in cur_files:
35 if cur_file.endswith('.csv'):
36 files_to_process.append(cur_dir + sep + cur_file)
37 else:
38 mkdir(self.output_path)
39 return files_to_process
41 @staticmethod
42 def load_csv_column_as_set(file_or_dir_path:str, key:str, line_threshold:int=10000):
43 result = set()
44 if exists(file_or_dir_path):
45 file_to_process = []
46 if isdir(file_or_dir_path):
47 for cur_dir, _, cur_files in walk(file_or_dir_path):
48 for cur_file in cur_files:
49 if cur_file.endswith('.csv'):
50 file_to_process.append(cur_dir + sep + cur_file)
51 else:
52 file_to_process.append(file_or_dir_path)
53 for item in CSVManager.__load_all_csv_files(file_to_process, CSVManager.__load_csv_by_key, line_threshold=line_threshold, key=key):
54 result.update(item)
55 return result
57 @staticmethod
58 def __load_csv_by_key(csv_string, key):
59 result = set()
60 csv_metadata = DictReader(StringIO(csv_string), delimiter=',')
61 for row in csv_metadata:
62 result.add(row[key])
63 return result
65 @staticmethod
66 def __load_all_csv_files(file_to_process, fun, line_threshold, **params):
67 result = []
68 header = None
69 for csv_path in file_to_process:
70 with open(csv_path, encoding='utf-8') as f:
71 csv_content = ''
72 for idx, line in enumerate(f.readlines()):
73 if header is None:
74 header = line
75 csv_content = header
76 else:
77 if idx % line_threshold == 0:
78 result.append(fun(csv_content, **params))
79 csv_content = header
80 csv_content += line
81 result.append(fun(csv_content, **params))
82 return result
84 def dump_data(self, file_name:str) -> None:
85 path = join(self.output_path, file_name)
86 if not exists(path):
87 with open(path, 'w', encoding='utf-8', newline='') as f:
88 f.write('"id","value"\n')
89 with open(path, 'a', encoding='utf-8', newline='') as f:
90 csv_writer = writer(f, delimiter=',')
91 for el in self.data_to_store:
92 csv_writer.writerow([el[0].replace('"', '""'), el[1].replace('"', '""')])
93 self.data_to_store = list()
95 def get_value(self, id_string):
96 '''
97 It returns the set of values associated to the input 'id_string',
98 or None if 'id_string' is not included in the CSV.
99 '''
100 if id_string in self.data:
101 return set(self.data[id_string])
103 def get_values_batch(self, ids: list[str]) -> dict[str, set[str]]:
104 return {id_str: set(self.data[id_str]) for id_str in ids if id_str in self.data}
106 def add_value(self, id_string, value):
107 '''
108 It adds the value specified in the set of values associated to 'id_string'.
109 If the object was created with the option of storing also the data in a CSV
110 ('store_new' = True, default behaviour), then it also add new data in the CSV.
111 '''
112 self.data.setdefault(id_string, set())
113 if value not in self.data[id_string]:
114 self.data[id_string].add(value)
115 self.data_to_store.append([id_string, value])
117 def __load_csv(self):
118 for file in self.existing_files:
119 with open(file, 'r', encoding='utf-8') as f:
120 reader = DictReader(f)
121 for row in reader:
122 self.data.setdefault(row['id'], set())
123 self.data[row['id']].add(row['value'])
125 def __low_memory_load(self, csv_string:str):
126 csv_metadata = DictReader(StringIO(csv_string), delimiter=',')
127 for row in csv_metadata:
128 cur_id = row['id']
129 if cur_id not in self.data:
130 self.data[cur_id] = set()
131 self.data[cur_id].add(row['value'])