Coverage for oc_ds_converter / lib / file_manager.py: 36%

141 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5 

6from __future__ import annotations 

7 

8import csv 

9import json 

10import os 

11import sys 

12from contextlib import contextmanager 

13from datetime import datetime 

14from pathlib import Path 

15from time import sleep 

16from collections.abc import Iterator, Mapping 

17from typing import Dict, List, Set 

18from zipfile import ZIP_DEFLATED, ZipFile 

19 

20from _collections_abc import dict_keys 

21from bs4 import BeautifulSoup 

22from requests import ReadTimeout, get 

23from requests.exceptions import ConnectionError 

24 

25from oc_ds_converter.lib.cleaner import Cleaner 

26 

27 

28def get_csv_data(filepath:str) -> List[Dict[str, str]]: 

29 if not os.path.splitext(filepath)[1].endswith('.csv'): 

30 return list() 

31 field_size_changed = False 

32 cur_field_size = 128 

33 data = list() 

34 while not data: 

35 try: 

36 with open(filepath, 'r', encoding='utf8') as data_initial: 

37 valid_data = (Cleaner(line).normalize_spaces().replace('\0','') for line in data_initial) 

38 data = list(csv.DictReader(valid_data, delimiter=',')) 

39 except csv.Error: 

40 cur_field_size *= 2 

41 csv.field_size_limit(cur_field_size) 

42 field_size_changed = True 

43 if field_size_changed: 

44 csv.field_size_limit(128) 

45 return data 

46 

47def pathoo(path): 

48 if not os.path.isdir(os.path.dirname(path)): 

49 os.makedirs(os.path.dirname(path)) 

50 

51def write_csv(path:str, datalist:List[dict], fieldnames:list|dict_keys|None=None, method:str='w') -> None: 

52 if datalist: 

53 fieldnames = datalist[0].keys() if fieldnames is None else fieldnames 

54 pathoo(path) 

55 file_exists = os.path.isfile(path) 

56 with open(path, method, newline='', encoding='utf-8') as output_file: 

57 dict_writer = csv.DictWriter(f=output_file, fieldnames=fieldnames, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC) 

58 if method == 'w' or (method == 'a' and not file_exists): 

59 dict_writer.writeheader() 

60 dict_writer.writerows(datalist) 

61 

62def normalize_path(path:str) -> str: 

63 normal_path = path.replace('\\', '/').replace('/', os.sep) 

64 return normal_path 

65 

66def init_cache(cache_filepath: str | None) -> Set[str]: 

67 if not cache_filepath or not os.path.exists(cache_filepath): 

68 return set() 

69 with open(cache_filepath, 'r', encoding='utf-8') as cache_file: 

70 cache_data = json.load(cache_file) 

71 if not cache_data: 

72 return set() 

73 citing = set(cache_data.get("citing", [])) 

74 cited = set(cache_data.get("cited", [])) 

75 return citing & cited 

76 

77@contextmanager 

78def suppress_stdout(): 

79 with open(os.devnull, 'w') as devnull: #pragma: no cover 

80 old_stdout = sys.stdout 

81 sys.stdout = devnull 

82 try: 

83 yield 

84 finally: 

85 sys.stdout = old_stdout 

86 

87def sort_files(files_to_be_processed:list) -> list: 

88 if all(filename.replace('.csv', '').isdigit() for filename in files_to_be_processed): 

89 files_to_be_processed = sorted(files_to_be_processed, key=lambda filename: int(filename.replace('.csv', ''))) 

90 elif all(filename.split('_')[-1].replace('.csv', '').isdigit() for filename in files_to_be_processed): 

91 files_to_be_processed = sorted(files_to_be_processed, key=lambda filename: int(filename.split('_')[-1].replace('.csv', ''))) 

92 return files_to_be_processed 

93 

94def zipdir(path, ziph): 

95 for root, _, files in os.walk(path): 

96 for file in files: 

97 ziph.write(os.path.join(root, file), 

98 os.path.relpath(os.path.join(root, file), 

99 os.path.join(path, '..'))) 

100 

101def zipit(dir_list:list, zip_name:str) -> None: 

102 zipf = ZipFile(file=zip_name, mode='w', compression=ZIP_DEFLATED, allowZip64=True) 

103 for dir in dir_list: 

104 zipdir(dir, zipf) 

105 zipf.close() 

106 

107def zip_files_in_dir(src_dir:str, dst_dir:str, replace_files:bool=False) -> None: 

108 ''' 

109 This method zips files individually in all directories starting from a specified root directory.  

110 In other words, this function does not zip the entire folder but individual files  

111 while maintaining the folder hierarchy in the specified output directory. 

112 

113 :params src_dir: the source directory 

114 :type src_dir: str 

115 :params dst_dir: the destination directory 

116 :type dst_dir: str 

117 :params replace_files: True if you want to replace the original unzipped files with their zipped versions. The dafult value is False 

118 :type replace_files: bool 

119 :returns: None 

120 ''' 

121 for dirpath, _, filenames in os.walk(src_dir): 

122 for filename in filenames: 

123 src_path = os.path.join(dirpath, filename) 

124 dst_path = os.path.join( 

125 dst_dir, 

126 str(Path(src_path).parent) 

127 .replace(f'{src_dir}{os.sep}', '')) 

128 if not os.path.exists(dst_path): 

129 os.makedirs(dst_path) 

130 _, ext = os.path.splitext(filename) 

131 zip_path = os.path.join(dst_path, filename).replace(ext, '.zip') 

132 with ZipFile(file=zip_path, mode='w', compression=ZIP_DEFLATED, allowZip64=True) as zipf: 

133 zipf.write(src_path, arcname=filename) 

134 if replace_files: 

135 os.remove(src_path) 

136 

137def unzip_files_in_dir(src_dir:str, dst_dir:str, replace_files:bool=False) -> None: 

138 ''' 

139 This method unzips zipped files individually in all directories starting from a specified root directory.  

140 In other words, this function does not unzip the entire folder but individual files  

141 while maintaining the folder hierarchy in the specified output directory. 

142 

143 :params src_dir: the source directory 

144 :type src_dir: str 

145 :params dst_dir: the destination directory 

146 :type dst_dir: str 

147 :params replace_files: True if you want to replace the original zipped files with their unzipped versions, defaults to [False] 

148 :type replace_files: bool 

149 :returns: None 

150 ''' 

151 for dirpath, _, filenames in os.walk(src_dir): 

152 for filename in filenames: 

153 if os.path.splitext(filename)[1] == '.zip': 

154 src_path = os.path.join(dirpath, filename) 

155 dst_path = os.path.join( 

156 dst_dir, 

157 str(Path(src_path).parent) 

158 .replace(f'{src_dir}{os.sep}', '')) 

159 if not os.path.exists(dst_path): 

160 os.makedirs(dst_path) 

161 with ZipFile(file=os.path.join(dst_path, filename), mode='r') as zipf: 

162 zipf.extractall(dst_path) 

163 if replace_files: 

164 os.remove(src_path) 

165 

166def read_zipped_json(filepath:str) -> dict|None: 

167 ''' 

168 This method reads a zipped json file. 

169 

170 :params filepath: the zipped json file path 

171 :type src_dir: str 

172 :returns: dict -- It returns the json file as a dictionary 

173 ''' 

174 with ZipFile(filepath, 'r') as zipf: 

175 for filename in zipf.namelist(): 

176 with zipf.open(filename) as f: 

177 json_data = f.read() 

178 json_dict = json.loads(json_data.decode("utf-8")) 

179 return json_dict 

180 

181def call_api(url: str, headers: Mapping[str, str | bytes] | None, r_format: str = "json") -> dict | BeautifulSoup | None: 

182 tentative = 3 

183 while tentative: 

184 tentative -= 1 

185 try: 

186 r = get(url, headers=headers, timeout=30) 

187 if r.status_code == 200: 

188 r.encoding = "utf-8" 

189 return json.loads(r.text) if r_format == "json" else BeautifulSoup(r.text, 'xml') 

190 elif r.status_code == 404: 

191 return None 

192 except ReadTimeout: 

193 # Do nothing, just try again 

194 pass 

195 except ConnectionError: 

196 # Sleep 5 seconds, then try again 

197 sleep(5) 

198 return None 

199 

200def rm_tmp_csv_files(base_dir:str) -> None: 

201 for filename in os.listdir(base_dir): 

202 number = filename.split('_')[0] 

203 date = datetime.strptime(filename.split('_')[1].replace('.csv', ''), '%Y-%m-%dT%H-%M-%S') 

204 for other_filename in os.listdir(base_dir): 

205 other_number = other_filename.split('_')[0] 

206 other_date = datetime.strptime(other_filename.split('_')[1].replace('.csv', ''), '%Y-%m-%dT%H-%M-%S') 

207 if number == other_number and filename != other_filename: 

208 if date < other_date: 

209 os.remove(os.path.join(base_dir, filename)) 

210 elif other_date < date: 

211 os.remove(os.path.join(base_dir, other_filename)) 

212 

213def chunks(lst: list, n: int) -> Iterator[list]: 

214 for i in range(0, len(lst), n): 

215 yield lst[i:i + n]