Coverage for oc_meta/lib/csvmanager.py: 91%

94 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2019, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from csv import DictReader, writer 

18from io import StringIO 

19from os import mkdir, sep, walk 

20from os.path import exists, isdir, join 

21from typing import Dict 

22 

23 

24class CSVManager(object): 

25 ''' 

26 This class is able to load a simple CSV composed by two fields, 'id' and 

27 'value', and then to index all its items in a structured form so as to be 

28 easily queried. In addition, it allows one to store new information in the CSV, 

29 if needed. 

30 ''' 

31 def __init__(self, output_path:str=None, line_threshold=10000, low_memory:bool=False): 

32 self.output_path = output_path 

33 self.data:Dict[str, set] = {} 

34 self.data_to_store = list() 

35 if output_path is not None: 

36 self.existing_files = self.__get_existing_files() 

37 if low_memory: 

38 self.__load_all_csv_files(self.existing_files, fun=self.__low_memory_load, line_threshold=line_threshold) 

39 else: 

40 self.__load_csv() 

41 

42 def __get_existing_files(self) -> list: 

43 files_to_process = [] 

44 if exists(self.output_path): 

45 for cur_dir, _, cur_files in walk(self.output_path): 

46 for cur_file in cur_files: 

47 if cur_file.endswith('.csv'): 

48 files_to_process.append(cur_dir + sep + cur_file) 

49 else: 

50 mkdir(self.output_path) 

51 return files_to_process 

52 

53 @staticmethod 

54 def load_csv_column_as_set(file_or_dir_path:str, key:str, line_threshold:int=10000): 

55 result = set() 

56 if exists(file_or_dir_path): 

57 file_to_process = [] 

58 if isdir(file_or_dir_path): 

59 for cur_dir, _, cur_files in walk(file_or_dir_path): 

60 for cur_file in cur_files: 

61 if cur_file.endswith('.csv'): 

62 file_to_process.append(cur_dir + sep + cur_file) 

63 else: 

64 file_to_process.append(file_or_dir_path) 

65 for item in CSVManager.__load_all_csv_files(file_to_process, CSVManager.__load_csv_by_key, line_threshold=line_threshold, key=key): 

66 result.update(item) 

67 return result 

68 

69 @staticmethod 

70 def __load_csv_by_key(csv_string, key): 

71 result = set() 

72 csv_metadata = DictReader(StringIO(csv_string), delimiter=',') 

73 for row in csv_metadata: 

74 result.add(row[key]) 

75 return result 

76 

77 @staticmethod 

78 def __load_all_csv_files(file_to_process, fun, line_threshold, **params): 

79 result = [] 

80 header = None 

81 for csv_path in file_to_process: 

82 with open(csv_path, encoding='utf-8') as f: 

83 csv_content = '' 

84 for idx, line in enumerate(f.readlines()): 

85 if header is None: 

86 header = line 

87 csv_content = header 

88 else: 

89 if idx % line_threshold == 0: 

90 result.append(fun(csv_content, **params)) 

91 csv_content = header 

92 csv_content += line 

93 result.append(fun(csv_content, **params)) 

94 return result 

95 

96 def dump_data(self, file_name:str) -> None: 

97 path = join(self.output_path, file_name) 

98 if not exists(path): 

99 with open(path, 'w', encoding='utf-8', newline='') as f: 

100 f.write('"id","value"\n') 

101 with open(path, 'a', encoding='utf-8', newline='') as f: 

102 csv_writer = writer(f, delimiter=',') 

103 for el in self.data_to_store: 

104 csv_writer.writerow([el[0].replace('"', '""'), el[1].replace('"', '""')]) 

105 self.data_to_store = list() 

106 

107 def get_value(self, id_string): 

108 ''' 

109 It returns the set of values associated to the input 'id_string', 

110 or None if 'id_string' is not included in the CSV. 

111 ''' 

112 if id_string in self.data: 

113 return set(self.data[id_string]) 

114 

115 def add_value(self, id_string, value): 

116 ''' 

117 It adds the value specified in the set of values associated to 'id_string'. 

118 If the object was created with the option of storing also the data in a CSV 

119 ('store_new' = True, default behaviour), then it also add new data in the CSV. 

120 ''' 

121 self.data.setdefault(id_string, set()) 

122 if value not in self.data[id_string]: 

123 self.data[id_string].add(value) 

124 self.data_to_store.append([id_string, value]) 

125 

126 def __load_csv(self): 

127 for file in self.existing_files: 

128 with open(file, 'r', encoding='utf-8') as f: 

129 reader = DictReader(f) 

130 for row in reader: 

131 self.data.setdefault(row['id'], set()) 

132 self.data[row['id']].add(row['value']) 

133 

134 def __low_memory_load(self, csv_string:str): 

135 csv_metadata = DictReader(StringIO(csv_string), delimiter=',') 

136 for row in csv_metadata: 

137 cur_id = row['id'] 

138 if cur_id not in self.data: 

139 self.data[cur_id] = set() 

140 self.data[cur_id].add(row['value'])