Coverage for oc_meta/lib/file_manager.py: 70%

152 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2022 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17 

18from __future__ import annotations 

19 

20import csv 

21import json 

22import os 

23import sys 

24from contextlib import contextmanager 

25from datetime import datetime 

26from pathlib import Path 

27from time import sleep 

28from typing import Dict, List, Set 

29from zipfile import ZIP_DEFLATED, ZipFile 

30 

31from _collections_abc import dict_keys 

32from bs4 import BeautifulSoup 

33from requests import ReadTimeout, get 

34from requests.exceptions import ConnectionError 

35 

36from oc_meta.lib.cleaner import Cleaner 

37 

38 

39def get_csv_data(filepath:str) -> List[Dict[str, str]]: 

40 if not os.path.splitext(filepath)[1].endswith('.csv'): 

41 return list() 

42 field_size_changed = False 

43 cur_field_size = 128 

44 data = list() 

45 while not data: 

46 try: 

47 with open(filepath, 'r', encoding='utf8') as data_initial: 

48 valid_data = (Cleaner(line.replace('\0','')).normalize_spaces() for line in data_initial) 

49 data = list(csv.DictReader(valid_data, delimiter=',')) 

50 except csv.Error: 

51 cur_field_size *= 2 

52 csv.field_size_limit(cur_field_size) 

53 field_size_changed = True 

54 if field_size_changed: 

55 csv.field_size_limit(128) 

56 return data 

57 

58 

59def get_csv_data_fast(filepath: str) -> List[Dict[str, str]]: 

60 """ 

61 Fast CSV reader that only handles field_size_limit without data cleaning. 

62 Use this when you don't need data normalization for better performance. 

63 """ 

64 if not os.path.splitext(filepath)[1].endswith('.csv'): 

65 return list() 

66 

67 field_size_changed = False 

68 cur_field_size = 128 

69 data = list() 

70 

71 while not data: 

72 try: 

73 with open(filepath, 'r', encoding='utf8') as data_file: 

74 data = list(csv.DictReader(data_file, delimiter=',')) 

75 except csv.Error: 

76 cur_field_size *= 2 

77 csv.field_size_limit(cur_field_size) 

78 field_size_changed = True 

79 

80 if field_size_changed: 

81 csv.field_size_limit(128) 

82 

83 return data 

84 

85def pathoo(path): 

86 if not os.path.isdir(os.path.dirname(path)): 

87 os.makedirs(os.path.dirname(path)) 

88 

89def write_csv(path:str, datalist:List[dict], fieldnames:list|dict_keys|None=None, method:str='w') -> None: 

90 if datalist: 

91 fieldnames = datalist[0].keys() if fieldnames is None else fieldnames 

92 pathoo(path) 

93 file_exists = os.path.isfile(path) 

94 with open(path, method, newline='', encoding='utf-8') as output_file: 

95 dict_writer = csv.DictWriter(f=output_file, fieldnames=fieldnames, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC) 

96 if method == 'w' or (method == 'a' and not file_exists): 

97 dict_writer.writeheader() 

98 dict_writer.writerows(datalist) 

99 

100def normalize_path(path:str) -> str: 

101 normal_path = path.replace('\\', '/').replace('/', os.sep) 

102 return normal_path 

103 

104def init_cache(cache_filepath:str|None) -> Set[str]: 

105 completed = set() 

106 if cache_filepath: 

107 if not os.path.exists(cache_filepath): 

108 pathoo(cache_filepath) 

109 else: 

110 with open(cache_filepath, 'r', encoding='utf-8') as cache_file: 

111 completed = {line.rstrip('\n') for line in cache_file} 

112 return completed 

113 

114@contextmanager 

115def suppress_stdout(): 

116 with open(os.devnull, 'w') as devnull: #pragma: no cover 

117 old_stdout = sys.stdout 

118 sys.stdout = devnull 

119 try: 

120 yield 

121 finally: 

122 sys.stdout = old_stdout 

123 

124def sort_files(files_to_be_processed:list) -> list: 

125 if all(filename.replace('.csv', '').isdigit() for filename in files_to_be_processed): 

126 files_to_be_processed = sorted(files_to_be_processed, key=lambda filename: int(filename.replace('.csv', ''))) 

127 elif all(filename.split('_')[-1].replace('.csv', '').isdigit() for filename in files_to_be_processed): 

128 files_to_be_processed = sorted(files_to_be_processed, key=lambda filename: int(filename.split('_')[-1].replace('.csv', ''))) 

129 return files_to_be_processed 

130 

131def zipdir(path, ziph): 

132 for root, _, files in os.walk(path): 

133 for file in files: 

134 ziph.write(os.path.join(root, file), 

135 os.path.relpath(os.path.join(root, file), 

136 os.path.join(path, '..'))) 

137 

138def zipit(dir_list:list, zip_name:str) -> None: 

139 zipf = ZipFile(file=zip_name, mode='w', compression=ZIP_DEFLATED, allowZip64=True) 

140 for dir in dir_list: 

141 zipdir(dir, zipf) 

142 zipf.close() 

143 

144def zip_files_in_dir(src_dir:str, dst_dir:str, replace_files:bool=False) -> None: 

145 ''' 

146 This method zips files individually in all directories starting from a specified root directory.  

147 In other words, this function does not zip the entire folder but individual files  

148 while maintaining the folder hierarchy in the specified output directory. 

149 

150 :params src_dir: the source directory 

151 :type src_dir: str 

152 :params dst_dir: the destination directory 

153 :type dst_dir: str 

154 :params replace_files: True if you want to replace the original unzipped files with their zipped versions. The dafult value is False 

155 :type replace_files: bool 

156 :returns: None 

157 ''' 

158 for dirpath, _, filenames in os.walk(src_dir): 

159 for filename in filenames: 

160 src_path = os.path.join(dirpath, filename) 

161 dst_path = os.path.join( 

162 dst_dir, 

163 str(Path(src_path).parent) 

164 .replace(f'{src_dir}{os.sep}', '')) 

165 if not os.path.exists(dst_path): 

166 os.makedirs(dst_path) 

167 _, ext = os.path.splitext(filename) 

168 zip_path = os.path.join(dst_path, filename).replace(ext, '.zip') 

169 with ZipFile(file=zip_path, mode='w', compression=ZIP_DEFLATED, allowZip64=True) as zipf: 

170 zipf.write(src_path, arcname=filename) 

171 if replace_files: 

172 os.remove(src_path) 

173 

174def unzip_files_in_dir(src_dir:str, dst_dir:str, replace_files:bool=False) -> None: 

175 ''' 

176 This method unzips zipped files individually in all directories starting from a specified root directory.  

177 In other words, this function does not unzip the entire folder but individual files  

178 while maintaining the folder hierarchy in the specified output directory. 

179 

180 :params src_dir: the source directory 

181 :type src_dir: str 

182 :params dst_dir: the destination directory 

183 :type dst_dir: str 

184 :params replace_files: True if you want to replace the original zipped files with their unzipped versions, defaults to [False] 

185 :type replace_files: bool 

186 :returns: None 

187 ''' 

188 for dirpath, _, filenames in os.walk(src_dir): 

189 for filename in filenames: 

190 if os.path.splitext(filename)[1] == '.zip': 

191 src_path = os.path.join(dirpath, filename) 

192 dst_path = os.path.join( 

193 dst_dir, 

194 str(Path(src_path).parent) 

195 .replace(f'{src_dir}{os.sep}', '')) 

196 if not os.path.exists(dst_path): 

197 os.makedirs(dst_path) 

198 with ZipFile(file=os.path.join(dst_path, filename), mode='r') as zipf: 

199 zipf.extractall(dst_path) 

200 if replace_files: 

201 os.remove(src_path) 

202 

203def read_zipped_json(filepath:str) -> dict|None: 

204 ''' 

205 This method reads a zipped json file. 

206 

207 :params filepath: the zipped json file path 

208 :type src_dir: str 

209 :returns: dict -- It returns the json file as a dictionary 

210 ''' 

211 with ZipFile(filepath, 'r') as zipf: 

212 for filename in zipf.namelist(): 

213 with zipf.open(filename) as f: 

214 json_data = f.read() 

215 json_dict = json.loads(json_data.decode("utf-8")) 

216 return json_dict 

217 

218def call_api(url:str, headers:str, r_format:str="json") -> dict|None: 

219 tentative = 3 

220 while tentative: 

221 tentative -= 1 

222 try: 

223 r = get(url, headers=headers, timeout=30) 

224 if r.status_code == 200: 

225 r.encoding = "utf-8" 

226 return json.loads(r.text) if r_format == "json" else BeautifulSoup(r.text, 'xml') 

227 elif r.status_code == 404: 

228 return None 

229 except ReadTimeout: 

230 # Do nothing, just try again 

231 pass 

232 except ConnectionError: 

233 # Sleep 5 seconds, then try again 

234 sleep(5) 

235 return None 

236 

237def rm_tmp_csv_files(base_dir:str) -> None: 

238 for filename in os.listdir(base_dir): 

239 number = filename.split('_')[0] 

240 date = datetime.strptime(filename.split('_')[1].replace('.csv', ''), '%Y-%m-%dT%H-%M-%S') 

241 for other_filename in os.listdir(base_dir): 

242 other_number = other_filename.split('_')[0] 

243 other_date = datetime.strptime(other_filename.split('_')[1].replace('.csv', ''), '%Y-%m-%dT%H-%M-%S') 

244 if number == other_number and filename != other_filename: 

245 if date < other_date: 

246 os.remove(os.path.join(base_dir, filename)) 

247 elif other_date < date: 

248 os.remove(os.path.join(base_dir, other_filename))