Coverage for oc_meta/lib/csvmanager.py: 91%
94 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2019, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from csv import DictReader, writer
18from io import StringIO
19from os import mkdir, sep, walk
20from os.path import exists, isdir, join
21from typing import Dict
24class CSVManager(object):
25 '''
26 This class is able to load a simple CSV composed by two fields, 'id' and
27 'value', and then to index all its items in a structured form so as to be
28 easily queried. In addition, it allows one to store new information in the CSV,
29 if needed.
30 '''
31 def __init__(self, output_path:str=None, line_threshold=10000, low_memory:bool=False):
32 self.output_path = output_path
33 self.data:Dict[str, set] = {}
34 self.data_to_store = list()
35 if output_path is not None:
36 self.existing_files = self.__get_existing_files()
37 if low_memory:
38 self.__load_all_csv_files(self.existing_files, fun=self.__low_memory_load, line_threshold=line_threshold)
39 else:
40 self.__load_csv()
42 def __get_existing_files(self) -> list:
43 files_to_process = []
44 if exists(self.output_path):
45 for cur_dir, _, cur_files in walk(self.output_path):
46 for cur_file in cur_files:
47 if cur_file.endswith('.csv'):
48 files_to_process.append(cur_dir + sep + cur_file)
49 else:
50 mkdir(self.output_path)
51 return files_to_process
53 @staticmethod
54 def load_csv_column_as_set(file_or_dir_path:str, key:str, line_threshold:int=10000):
55 result = set()
56 if exists(file_or_dir_path):
57 file_to_process = []
58 if isdir(file_or_dir_path):
59 for cur_dir, _, cur_files in walk(file_or_dir_path):
60 for cur_file in cur_files:
61 if cur_file.endswith('.csv'):
62 file_to_process.append(cur_dir + sep + cur_file)
63 else:
64 file_to_process.append(file_or_dir_path)
65 for item in CSVManager.__load_all_csv_files(file_to_process, CSVManager.__load_csv_by_key, line_threshold=line_threshold, key=key):
66 result.update(item)
67 return result
69 @staticmethod
70 def __load_csv_by_key(csv_string, key):
71 result = set()
72 csv_metadata = DictReader(StringIO(csv_string), delimiter=',')
73 for row in csv_metadata:
74 result.add(row[key])
75 return result
77 @staticmethod
78 def __load_all_csv_files(file_to_process, fun, line_threshold, **params):
79 result = []
80 header = None
81 for csv_path in file_to_process:
82 with open(csv_path, encoding='utf-8') as f:
83 csv_content = ''
84 for idx, line in enumerate(f.readlines()):
85 if header is None:
86 header = line
87 csv_content = header
88 else:
89 if idx % line_threshold == 0:
90 result.append(fun(csv_content, **params))
91 csv_content = header
92 csv_content += line
93 result.append(fun(csv_content, **params))
94 return result
96 def dump_data(self, file_name:str) -> None:
97 path = join(self.output_path, file_name)
98 if not exists(path):
99 with open(path, 'w', encoding='utf-8', newline='') as f:
100 f.write('"id","value"\n')
101 with open(path, 'a', encoding='utf-8', newline='') as f:
102 csv_writer = writer(f, delimiter=',')
103 for el in self.data_to_store:
104 csv_writer.writerow([el[0].replace('"', '""'), el[1].replace('"', '""')])
105 self.data_to_store = list()
107 def get_value(self, id_string):
108 '''
109 It returns the set of values associated to the input 'id_string',
110 or None if 'id_string' is not included in the CSV.
111 '''
112 if id_string in self.data:
113 return set(self.data[id_string])
115 def add_value(self, id_string, value):
116 '''
117 It adds the value specified in the set of values associated to 'id_string'.
118 If the object was created with the option of storing also the data in a CSV
119 ('store_new' = True, default behaviour), then it also add new data in the CSV.
120 '''
121 self.data.setdefault(id_string, set())
122 if value not in self.data[id_string]:
123 self.data[id_string].add(value)
124 self.data_to_store.append([id_string, value])
126 def __load_csv(self):
127 for file in self.existing_files:
128 with open(file, 'r', encoding='utf-8') as f:
129 reader = DictReader(f)
130 for row in reader:
131 self.data.setdefault(row['id'], set())
132 self.data[row['id']].add(row['value'])
134 def __low_memory_load(self, csv_string:str):
135 csv_metadata = DictReader(StringIO(csv_string), delimiter=',')
136 for row in csv_metadata:
137 cur_id = row['id']
138 if cur_id not in self.data:
139 self.data[cur_id] = set()
140 self.data[cur_id].add(row['value'])