Coverage for oc_ds_converter / lib / file_manager.py: 36%
141 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
6from __future__ import annotations
8import csv
9import json
10import os
11import sys
12from contextlib import contextmanager
13from datetime import datetime
14from pathlib import Path
15from time import sleep
16from collections.abc import Iterator, Mapping
17from typing import Dict, List, Set
18from zipfile import ZIP_DEFLATED, ZipFile
20from _collections_abc import dict_keys
21from bs4 import BeautifulSoup
22from requests import ReadTimeout, get
23from requests.exceptions import ConnectionError
25from oc_ds_converter.lib.cleaner import Cleaner
28def get_csv_data(filepath:str) -> List[Dict[str, str]]:
29 if not os.path.splitext(filepath)[1].endswith('.csv'):
30 return list()
31 field_size_changed = False
32 cur_field_size = 128
33 data = list()
34 while not data:
35 try:
36 with open(filepath, 'r', encoding='utf8') as data_initial:
37 valid_data = (Cleaner(line).normalize_spaces().replace('\0','') for line in data_initial)
38 data = list(csv.DictReader(valid_data, delimiter=','))
39 except csv.Error:
40 cur_field_size *= 2
41 csv.field_size_limit(cur_field_size)
42 field_size_changed = True
43 if field_size_changed:
44 csv.field_size_limit(128)
45 return data
47def pathoo(path):
48 if not os.path.isdir(os.path.dirname(path)):
49 os.makedirs(os.path.dirname(path))
51def write_csv(path:str, datalist:List[dict], fieldnames:list|dict_keys|None=None, method:str='w') -> None:
52 if datalist:
53 fieldnames = datalist[0].keys() if fieldnames is None else fieldnames
54 pathoo(path)
55 file_exists = os.path.isfile(path)
56 with open(path, method, newline='', encoding='utf-8') as output_file:
57 dict_writer = csv.DictWriter(f=output_file, fieldnames=fieldnames, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
58 if method == 'w' or (method == 'a' and not file_exists):
59 dict_writer.writeheader()
60 dict_writer.writerows(datalist)
62def normalize_path(path:str) -> str:
63 normal_path = path.replace('\\', '/').replace('/', os.sep)
64 return normal_path
66def init_cache(cache_filepath: str | None) -> Set[str]:
67 if not cache_filepath or not os.path.exists(cache_filepath):
68 return set()
69 with open(cache_filepath, 'r', encoding='utf-8') as cache_file:
70 cache_data = json.load(cache_file)
71 if not cache_data:
72 return set()
73 citing = set(cache_data.get("citing", []))
74 cited = set(cache_data.get("cited", []))
75 return citing & cited
77@contextmanager
78def suppress_stdout():
79 with open(os.devnull, 'w') as devnull: #pragma: no cover
80 old_stdout = sys.stdout
81 sys.stdout = devnull
82 try:
83 yield
84 finally:
85 sys.stdout = old_stdout
87def sort_files(files_to_be_processed:list) -> list:
88 if all(filename.replace('.csv', '').isdigit() for filename in files_to_be_processed):
89 files_to_be_processed = sorted(files_to_be_processed, key=lambda filename: int(filename.replace('.csv', '')))
90 elif all(filename.split('_')[-1].replace('.csv', '').isdigit() for filename in files_to_be_processed):
91 files_to_be_processed = sorted(files_to_be_processed, key=lambda filename: int(filename.split('_')[-1].replace('.csv', '')))
92 return files_to_be_processed
94def zipdir(path, ziph):
95 for root, _, files in os.walk(path):
96 for file in files:
97 ziph.write(os.path.join(root, file),
98 os.path.relpath(os.path.join(root, file),
99 os.path.join(path, '..')))
101def zipit(dir_list:list, zip_name:str) -> None:
102 zipf = ZipFile(file=zip_name, mode='w', compression=ZIP_DEFLATED, allowZip64=True)
103 for dir in dir_list:
104 zipdir(dir, zipf)
105 zipf.close()
107def zip_files_in_dir(src_dir:str, dst_dir:str, replace_files:bool=False) -> None:
108 '''
109 This method zips files individually in all directories starting from a specified root directory.
110 In other words, this function does not zip the entire folder but individual files
111 while maintaining the folder hierarchy in the specified output directory.
113 :params src_dir: the source directory
114 :type src_dir: str
115 :params dst_dir: the destination directory
116 :type dst_dir: str
117 :params replace_files: True if you want to replace the original unzipped files with their zipped versions. The dafult value is False
118 :type replace_files: bool
119 :returns: None
120 '''
121 for dirpath, _, filenames in os.walk(src_dir):
122 for filename in filenames:
123 src_path = os.path.join(dirpath, filename)
124 dst_path = os.path.join(
125 dst_dir,
126 str(Path(src_path).parent)
127 .replace(f'{src_dir}{os.sep}', ''))
128 if not os.path.exists(dst_path):
129 os.makedirs(dst_path)
130 _, ext = os.path.splitext(filename)
131 zip_path = os.path.join(dst_path, filename).replace(ext, '.zip')
132 with ZipFile(file=zip_path, mode='w', compression=ZIP_DEFLATED, allowZip64=True) as zipf:
133 zipf.write(src_path, arcname=filename)
134 if replace_files:
135 os.remove(src_path)
137def unzip_files_in_dir(src_dir:str, dst_dir:str, replace_files:bool=False) -> None:
138 '''
139 This method unzips zipped files individually in all directories starting from a specified root directory.
140 In other words, this function does not unzip the entire folder but individual files
141 while maintaining the folder hierarchy in the specified output directory.
143 :params src_dir: the source directory
144 :type src_dir: str
145 :params dst_dir: the destination directory
146 :type dst_dir: str
147 :params replace_files: True if you want to replace the original zipped files with their unzipped versions, defaults to [False]
148 :type replace_files: bool
149 :returns: None
150 '''
151 for dirpath, _, filenames in os.walk(src_dir):
152 for filename in filenames:
153 if os.path.splitext(filename)[1] == '.zip':
154 src_path = os.path.join(dirpath, filename)
155 dst_path = os.path.join(
156 dst_dir,
157 str(Path(src_path).parent)
158 .replace(f'{src_dir}{os.sep}', ''))
159 if not os.path.exists(dst_path):
160 os.makedirs(dst_path)
161 with ZipFile(file=os.path.join(dst_path, filename), mode='r') as zipf:
162 zipf.extractall(dst_path)
163 if replace_files:
164 os.remove(src_path)
166def read_zipped_json(filepath:str) -> dict|None:
167 '''
168 This method reads a zipped json file.
170 :params filepath: the zipped json file path
171 :type src_dir: str
172 :returns: dict -- It returns the json file as a dictionary
173 '''
174 with ZipFile(filepath, 'r') as zipf:
175 for filename in zipf.namelist():
176 with zipf.open(filename) as f:
177 json_data = f.read()
178 json_dict = json.loads(json_data.decode("utf-8"))
179 return json_dict
181def call_api(url: str, headers: Mapping[str, str | bytes] | None, r_format: str = "json") -> dict | BeautifulSoup | None:
182 tentative = 3
183 while tentative:
184 tentative -= 1
185 try:
186 r = get(url, headers=headers, timeout=30)
187 if r.status_code == 200:
188 r.encoding = "utf-8"
189 return json.loads(r.text) if r_format == "json" else BeautifulSoup(r.text, 'xml')
190 elif r.status_code == 404:
191 return None
192 except ReadTimeout:
193 # Do nothing, just try again
194 pass
195 except ConnectionError:
196 # Sleep 5 seconds, then try again
197 sleep(5)
198 return None
200def rm_tmp_csv_files(base_dir:str) -> None:
201 for filename in os.listdir(base_dir):
202 number = filename.split('_')[0]
203 date = datetime.strptime(filename.split('_')[1].replace('.csv', ''), '%Y-%m-%dT%H-%M-%S')
204 for other_filename in os.listdir(base_dir):
205 other_number = other_filename.split('_')[0]
206 other_date = datetime.strptime(other_filename.split('_')[1].replace('.csv', ''), '%Y-%m-%dT%H-%M-%S')
207 if number == other_number and filename != other_filename:
208 if date < other_date:
209 os.remove(os.path.join(base_dir, filename))
210 elif other_date < date:
211 os.remove(os.path.join(base_dir, other_filename))
213def chunks(lst: list, n: int) -> Iterator[list]:
214 for i in range(0, len(lst), n):
215 yield lst[i:i + n]