Coverage for oc_meta / lib / file_manager.py: 78%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7 

8from __future__ import annotations 

9 

10import csv 

11import os 

12import sys 

13from contextlib import contextmanager 

14from pathlib import Path 

15from time import sleep 

16from typing import Callable, Dict, List, Set 

17from zipfile import ZIP_DEFLATED, ZipFile 

18 

19import orjson 

20from _collections_abc import dict_keys 

21from bs4 import BeautifulSoup 

22from requests import ReadTimeout, get 

23from requests.exceptions import ConnectionError 

24from scandir_rs import Walk # type: ignore[import-untyped] 

25 

26from oc_meta.lib.cleaner import normalize_spaces 

27 

28 

29def collect_files( 

30 root: str, 

31 pattern: str = "*.zip", 

32 path_filter: Callable[[str], bool] | None = None, 

33) -> List[str]: 

34 """ 

35 Directory traversal to collect files matching a pattern. 

36 

37 Uses scandir-rs (Rust-based) for fast directory iteration. 

38 

39 :param root: Root directory to start traversal 

40 :param pattern: Glob pattern for filenames (e.g., '*.zip', 'se.zip') 

41 :param path_filter: Optional callable that receives full file path and returns 

42 True to include, False to exclude. Example: 

43 lambda p: 'prov' not in p # exclude prov directories 

44 :returns: List of matching file paths 

45 """ 

46 collected: List[str] = [] 

47 for dirpath, _, filenames in Walk(root, file_include=[pattern]): 

48 for filename in filenames: 

49 full_path = os.path.join(root, dirpath, filename) 

50 if path_filter is None or path_filter(full_path): 

51 collected.append(full_path) 

52 return collected 

53 

54 

55def collect_zip_files( 

56 root: str, 

57 only_data: bool = False, 

58 only_prov: bool = False, 

59) -> List[str]: 

60 """ 

61 Collect ZIP files from a directory tree. 

62 

63 :param root: Root directory to start traversal 

64 :param only_data: Only include files NOT in paths containing 'prov' 

65 :param only_prov: Only include files in paths containing 'prov' 

66 :returns: Sorted list of ZIP file paths 

67 

68 If both only_data and only_prov are False, all ZIP files are collected. 

69 """ 

70 if only_data and only_prov: 

71 return [] 

72 

73 path_filter: Callable[[str], bool] | None = None 

74 if only_data: 

75 path_filter = lambda p: "prov" not in p 

76 elif only_prov: 

77 path_filter = lambda p: "prov" in p 

78 

79 files = collect_files(root, "*.zip", path_filter) 

80 return sorted(files) 

81 

82 

83def get_csv_data(filepath: str, clean_data: bool = True) -> List[Dict[str, str]]: 

84 if not os.path.splitext(filepath)[1].endswith(".csv"): 

85 return list() 

86 field_size_changed = False 

87 cur_field_size = 128 

88 data: List[Dict[str, str]] = list() 

89 while True: 

90 try: 

91 with open(filepath, "r", encoding="utf8") as f: 

92 if clean_data: 

93 lines = ( 

94 normalize_spaces(line.replace("\0", "")) for line in f 

95 ) 

96 data = list(csv.DictReader(lines, delimiter=",")) 

97 else: 

98 data = list(csv.DictReader(f, delimiter=",")) 

99 break 

100 except csv.Error: 

101 cur_field_size *= 2 

102 csv.field_size_limit(cur_field_size) 

103 field_size_changed = True 

104 if field_size_changed: 

105 csv.field_size_limit(128) 

106 return data 

107 

108def pathoo(path): 

109 if not os.path.isdir(os.path.dirname(path)): 

110 os.makedirs(os.path.dirname(path)) 

111 

112def write_csv(path:str, datalist:List[dict], fieldnames:list|dict_keys|None=None, method:str='w') -> None: 

113 if datalist: 

114 fieldnames = datalist[0].keys() if fieldnames is None else fieldnames 

115 pathoo(path) 

116 file_exists = os.path.isfile(path) 

117 with open(path, method, newline='', encoding='utf-8') as output_file: 

118 dict_writer = csv.DictWriter(f=output_file, fieldnames=fieldnames, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC) 

119 if method == 'w' or (method == 'a' and not file_exists): 

120 dict_writer.writeheader() 

121 dict_writer.writerows(datalist) 

122 

123def normalize_path(path:str) -> str: 

124 normal_path = path.replace('\\', '/').replace('/', os.sep) 

125 return normal_path 

126 

127def init_cache(cache_filepath:str|None) -> Set[str]: 

128 completed = set() 

129 if cache_filepath: 

130 if not os.path.exists(cache_filepath): 

131 pathoo(cache_filepath) 

132 else: 

133 with open(cache_filepath, 'r', encoding='utf-8') as cache_file: 

134 completed = {line.rstrip('\n') for line in cache_file} 

135 return completed 

136 

137@contextmanager 

138def suppress_stdout(): 

139 with open(os.devnull, 'w') as devnull: #pragma: no cover 

140 old_stdout = sys.stdout 

141 sys.stdout = devnull 

142 try: 

143 yield 

144 finally: 

145 sys.stdout = old_stdout 

146 

147def sort_files(files_to_be_processed:list) -> list: 

148 if all(filename.replace('.csv', '').isdigit() for filename in files_to_be_processed): 

149 files_to_be_processed = sorted(files_to_be_processed, key=lambda filename: int(filename.replace('.csv', ''))) 

150 elif all(filename.split('_')[-1].replace('.csv', '').isdigit() for filename in files_to_be_processed): 

151 files_to_be_processed = sorted(files_to_be_processed, key=lambda filename: int(filename.split('_')[-1].replace('.csv', ''))) 

152 return files_to_be_processed 

153 

154def zipdir(path, ziph): 

155 for root, _, files in os.walk(path): 

156 for file in files: 

157 ziph.write(os.path.join(root, file), 

158 os.path.relpath(os.path.join(root, file), 

159 os.path.join(path, '..'))) 

160 

161def zipit(dir_list:list, zip_name:str) -> None: 

162 zipf = ZipFile(file=zip_name, mode='w', compression=ZIP_DEFLATED, allowZip64=True) 

163 for dir in dir_list: 

164 zipdir(dir, zipf) 

165 zipf.close() 

166 

167def zip_files_in_dir(src_dir:str, dst_dir:str, replace_files:bool=False) -> None: 

168 ''' 

169 This method zips files individually in all directories starting from a specified root directory.  

170 In other words, this function does not zip the entire folder but individual files  

171 while maintaining the folder hierarchy in the specified output directory. 

172 

173 :params src_dir: the source directory 

174 :type src_dir: str 

175 :params dst_dir: the destination directory 

176 :type dst_dir: str 

177 :params replace_files: True if you want to replace the original unzipped files with their zipped versions. The dafult value is False 

178 :type replace_files: bool 

179 :returns: None 

180 ''' 

181 for dirpath, _, filenames in os.walk(src_dir): 

182 for filename in filenames: 

183 src_path = os.path.join(dirpath, filename) 

184 dst_path = os.path.join( 

185 dst_dir, 

186 str(Path(src_path).parent) 

187 .replace(f'{src_dir}{os.sep}', '')) 

188 if not os.path.exists(dst_path): 

189 os.makedirs(dst_path) 

190 _, ext = os.path.splitext(filename) 

191 zip_path = os.path.join(dst_path, filename).replace(ext, '.zip') 

192 with ZipFile(file=zip_path, mode='w', compression=ZIP_DEFLATED, allowZip64=True) as zipf: 

193 zipf.write(src_path, arcname=filename) 

194 if replace_files: 

195 os.remove(src_path) 

196 

197def unzip_files_in_dir(src_dir:str, dst_dir:str, replace_files:bool=False) -> None: 

198 ''' 

199 This method unzips zipped files individually in all directories starting from a specified root directory.  

200 In other words, this function does not unzip the entire folder but individual files  

201 while maintaining the folder hierarchy in the specified output directory. 

202 

203 :params src_dir: the source directory 

204 :type src_dir: str 

205 :params dst_dir: the destination directory 

206 :type dst_dir: str 

207 :params replace_files: True if you want to replace the original zipped files with their unzipped versions, defaults to [False] 

208 :type replace_files: bool 

209 :returns: None 

210 ''' 

211 for dirpath, _, filenames in os.walk(src_dir): 

212 for filename in filenames: 

213 if os.path.splitext(filename)[1] == '.zip': 

214 src_path = os.path.join(dirpath, filename) 

215 dst_path = os.path.join( 

216 dst_dir, 

217 str(Path(src_path).parent) 

218 .replace(f'{src_dir}{os.sep}', '')) 

219 if not os.path.exists(dst_path): 

220 os.makedirs(dst_path) 

221 with ZipFile(file=os.path.join(dst_path, filename), mode='r') as zipf: 

222 zipf.extractall(dst_path) 

223 if replace_files: 

224 os.remove(src_path) 

225 

226def read_zipped_json(filepath:str) -> dict|None: 

227 ''' 

228 This method reads a zipped json file. 

229 

230 :params filepath: the zipped json file path 

231 :type src_dir: str 

232 :returns: dict -- It returns the json file as a dictionary 

233 ''' 

234 with ZipFile(filepath, 'r') as zipf: 

235 for filename in zipf.namelist(): 

236 with zipf.open(filename) as f: 

237 json_data = f.read() 

238 json_dict = orjson.loads(json_data) 

239 return json_dict 

240 

241def call_api(url: str, headers: dict[str, str], r_format: str = "json") -> dict | BeautifulSoup | None: 

242 tentative = 3 

243 while tentative: 

244 tentative -= 1 

245 try: 

246 r = get(url, headers=headers, timeout=30) 

247 if r.status_code == 200: 

248 r.encoding = "utf-8" 

249 if r_format == "json": 

250 return orjson.loads(r.text) 

251 return BeautifulSoup(r.text, "xml") 

252 elif r.status_code == 404: 

253 return None 

254 except ReadTimeout: 

255 pass 

256 except ConnectionError: 

257 sleep(5) 

258 return None 

259