Coverage for oc_ds_converter / preprocessing / jalc.py: 31%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2024 Arianna Moretti <arianna.moretti4@unibo.it> 

3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7import os 

8import shutil 

9import zipfile 

10from argparse import ArgumentParser 

11from os import makedirs, sep, walk 

12from os.path import basename, exists 

13 

14from concurrent.futures import ProcessPoolExecutor 

15from multiprocessing import get_context 

16 

17from tqdm import tqdm 

18 

19from oc_ds_converter.lib.file_manager import normalize_path 

20 

21 

22def preprocessing(jalc_json_dir:str, output_dir:str, max_workers:int = 1): 

23 """This function preprocesses the original JALC zipped dump. The original dump has the following structure: 

24 - jalc_dump.zip 

25 -jalc_dataset_dir 

26 -prefixes.json 

27 -prefix1.zip 

28 -prefix1_dir 

29 -doiList1.json 

30 -doi1A.json 

31 -doi1B.json 

32 -etc. 

33 -prefix2.zip 

34 -prefix2_dir 

35 -doiList2.json 

36 -doi2A.json 

37 -etc. 

38 The preprocessing in particular removes the intermediate folders (like jalc_dump, prefix1, prefix2 in the example above) 

39 and if inside one prefix.zip file are found more than 100 files, the original zip file is divided into the number of 

40 json files in it//100 +1. In the case in which a prefix.zip file is divided into more zip files, the doiList.json is copied 

41 just in the first subfile.""" 

42 

43 '''In the els_to_be_skipped list are appended all the files found in the input directory starting with 

44 "._" and the zip file if the corresponding decompressed directory is found''' 

45 els_to_be_skipped = [] 

46 input_dir_cont = os.listdir(jalc_json_dir) 

47 for el in input_dir_cont:# should be one (the input dir contains 1 zip) 

48 if el.startswith("._"): 

49 # skip elements starting with ._ 

50 els_to_be_skipped.append(os.path.join(jalc_json_dir, el)) 

51 else: 

52 if el.endswith(".zip"): 

53 base_name = el.replace('.zip', '') 

54 if [x for x in os.listdir(jalc_json_dir) if x.startswith(base_name) and x.endswith("decompr_zip_dir")]: 

55 els_to_be_skipped.append(os.path.join(jalc_json_dir, el)) 

56 

57 if not os.path.exists(output_dir): 

58 os.makedirs(output_dir) 

59 

60 all_unzipped_files = [] 

61 '''The elements in the "els_to_be_skipped" list are now taken into account. If inside the list a zip element is found, 

62 the corresponding unzipped folder (if found in the input directory) is used to list all the elements inside the original  

63 zipped dump.''' 

64 els_to_be_skipped_cont = [x for x in els_to_be_skipped if x.endswith(".zip")] 

65 if els_to_be_skipped_cont: 

66 for el_to_skip in els_to_be_skipped_cont: 

67 if el_to_skip.startswith("._"): 

68 continue 

69 base_name_el_to_skip = os.path.basename(el_to_skip).replace('.zip', '') 

70 for directory in os.listdir(jalc_json_dir): 

71 if directory == base_name_el_to_skip + "_decompr_zip_dir": 

72 # if el.startswith(base_name_el_to_skip) and el.endswith("decompr_zip_dir"): 

73 for dir_lev2 in os.listdir(os.path.join(jalc_json_dir, directory)): 

74 all_unzipped_files = [os.path.join(jalc_json_dir, directory, dir_lev2, file) for file in os.listdir(os.path.join(jalc_json_dir, directory, dir_lev2)) if not file.startswith("._")] 

75 

76 '''If there aren't elements in the "els_to_be_skipped" list all the elements in the original 

77 zipped dump are extracted in a folder with the same basename of the original dump + "_decompre_zip_dir".''' 

78 if len(all_unzipped_files) == 0: 

79 for zip_lev0 in os.listdir(jalc_json_dir): 

80 if zip_lev0.endswith("zip") and not zip_lev0.startswith("._"): 

81 with zipfile.ZipFile(os.path.join(jalc_json_dir, zip_lev0), 'r') as zip_ref: 

82 dest_dir = os.path.join(jalc_json_dir, zip_lev0).replace('.zip', '') + "_decompr_zip_dir" 

83 if not exists(dest_dir): 

84 makedirs(dest_dir) 

85 zip_ref.extractall(dest_dir) 

86 print(f"Unzipped to {dest_dir}") 

87 for cur_dir, cur_subdir, cur_files in walk(dest_dir): 

88 for cur_file in cur_files: 

89 if not basename(cur_file).startswith("."): 

90 all_unzipped_files.append(cur_dir + sep + cur_file) 

91 

92 

93 

94 # Filter for zip files (files with ".zip" extension) 

95 zip_files = [file for file in all_unzipped_files if file.endswith(".zip")] 

96 not_zip_files = [file for file in all_unzipped_files if not file.endswith(".zip")] 

97 

98 # copy all extra files as they are in the output dir 

99 for file in not_zip_files: 

100 if os.path.isfile(file): 

101 shutil.copy(file, output_dir) 

102 

103 # parallelization of the process 

104 if max_workers == 1: 

105 for zip_file in tqdm(zip_files, desc="Processing ZIP Files"): 

106 process_zip(zip_file, output_dir) 

107 

108 else: 

109 with ProcessPoolExecutor(max_workers=max_workers, mp_context=get_context('spawn')) as executor: 

110 for zip_file in tqdm(zip_files, desc="Processing ZIP Files"): 

111 executor.submit(process_zip, zip_file, output_dir) 

112 

113 

114 # At the end of the process, create a ZIP archive of the output_dir and rename it 

115 print("Zipping the output directory") 

116 shutil.make_archive(output_dir, 'zip', output_dir) 

117 print("Removing output directory") 

118 shutil.rmtree(output_dir) # Delete the original directory 

119 

120 

121def process_zip(zip_file, output_dir2): 

122 

123 if not zip_file.endswith(".zip"): 

124 shutil.copy(zip_file, output_dir2) 

125 

126 else: 

127 # manage the resizing 

128 # Full path to the zip file 

129 zip_file_path = zip_file 

130 first_level_path = os.path.dirname(zip_file_path) 

131 print("executing", zip_file_path) 

132 # Open the zip file for reading 

133 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: 

134 all_files_in_zip = zip_ref.namelist() 

135 

136 directories = [entry for entry in all_files_in_zip if entry.endswith('/')] 

137 # List the names of files within the zip (the relevant one, the json containing the citation information) 

138 file_names = [x for x in all_files_in_zip if x.endswith(".json") and "doiList" not in x and x not in directories] 

139 extra_file_names = [x for x in all_files_in_zip if "doiList" in x or x.endswith(".out")] 

140 

141 if len(file_names) <= 100: 

142 print("less than 100") 

143 zip_file_name = os.path.basename(zip_file_path).replace(".zip", "") 

144 dir_to_zip = os.path.join(output_dir2, zip_file_name) 

145 os.makedirs(dir_to_zip, exist_ok=True) 

146 for non_zip in extra_file_names: 

147 if not os.path.isdir(non_zip) and 'doiList' in non_zip: 

148 '''We use os.path.basename(file_info.filename) to extract just the filename 

149 without the folder structure. We create a new file in the destination folder  

150 with the extracted filename and write the content of the file from the ZIP archive into it.''' 

151 file_info = zip_ref.getinfo(non_zip) 

152 extracted_file_path = os.path.join(dir_to_zip, os.path.basename(file_info.filename)) 

153 with open(extracted_file_path, 'wb') as extracted_file: 

154 extracted_file.write(zip_ref.read(non_zip)) 

155 for fs in file_names: 

156 file_info = zip_ref.getinfo(fs) 

157 extracted_file_path = os.path.join(dir_to_zip, 

158 os.path.basename(file_info.filename)) 

159 with open(extracted_file_path, 'wb') as extracted_file: 

160 extracted_file.write(zip_ref.read(fs)) 

161 

162 # Parent directory 

163 parent_directory = os.path.dirname(dir_to_zip) 

164 # Name the zip file 

165 zip_file_name = dir_to_zip + ".zip" 

166 # Full path to the destination zip file 

167 zip_file_path = os.path.join(parent_directory, zip_file_name) 

168 

169 with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf: 

170 for foldername, subfolders, filenames in os.walk(dir_to_zip): 

171 for filename in filenames: 

172 file_path = os.path.join(foldername, filename) 

173 relative_path = os.path.relpath(file_path, dir_to_zip) 

174 zipf.write(file_path, relative_path) 

175 # After creating the zip, delete the original directory 

176 shutil.rmtree(dir_to_zip) 

177 #print("Just Completed zip:", zip_file_path) 

178 

179 else: 

180 # if there are more than 100 jsons, they should be divided as many zips as the total number of jsons/ 100. 

181 # The first of the new sub-zip files should also contain all the extra files which where stored in the original zip 

182 print("more than 100: ", len(file_names)) 

183 zip_basename = zip_file_path.replace(".zip", "").replace(first_level_path, "").replace("/", "").replace( 

184 "\\", "") 

185 zip_part_counter = 0 

186 

187 while len(file_names): 

188 # name of the new folder (prefix_0.zip) 

189 zip_basename_w_counter = zip_basename + "_" + str(zip_part_counter) 

190 # path of the new folder where files will be extracted 

191 sliced_out_name = os.path.join(output_dir2, zip_basename_w_counter) 

192 os.makedirs(sliced_out_name, exist_ok=True) 

193 

194 # copying in the output folder all the extra files (not json files with information on the bibliographic entities) 

195 if zip_part_counter == 0: 

196 for non_zip in extra_file_names: 

197 if not os.path.isdir(non_zip) and 'doiList' in non_zip: 

198 '''We use os.path.basename(file_info.filename) to extract just the filename 

199 without the folder structure. We create a new file in the destination folder  

200 with the extracted filename and write the content of the file from the ZIP archive into it.''' 

201 file_info = zip_ref.getinfo(non_zip) 

202 extracted_file_path = os.path.join(sliced_out_name, os.path.basename(file_info.filename)) 

203 with open(extracted_file_path, 'wb') as extracted_file: 

204 extracted_file.write(zip_ref.read(non_zip)) 

205 

206 if len(file_names) > 100: 

207 print("json to be processed in this zip:", len(file_names)) 

208 

209 files_slice = file_names[:100] 

210 file_names = file_names[100:] 

211 zip_part_counter +=1 

212 

213 for fs in files_slice: 

214 file_info = zip_ref.getinfo(fs) 

215 extracted_file_path = os.path.join(sliced_out_name, 

216 os.path.basename(file_info.filename)) 

217 with open(extracted_file_path, 'wb') as extracted_file: 

218 extracted_file.write(zip_ref.read(fs)) 

219 else: 

220 print("last", len(file_names), "jsons to be processed") 

221 files_slice = file_names 

222 file_names = [] 

223 zip_part_counter += 1 

224 

225 for fs in files_slice: 

226 file_info = zip_ref.getinfo(fs) 

227 extracted_file_path = os.path.join(sliced_out_name, 

228 os.path.basename(file_info.filename)) 

229 with open(extracted_file_path, 'wb') as extracted_file: 

230 extracted_file.write(zip_ref.read(fs)) 

231 

232 # Directory to zip and delete 

233 directory_to_zip = sliced_out_name 

234 # Parent directory 

235 parent_directory = os.path.dirname(directory_to_zip) 

236 # Name the zip file 

237 zip_file_name = directory_to_zip + ".zip" 

238 # Full path to the destination zip file 

239 zip_file_path = os.path.join(parent_directory, zip_file_name) 

240 # create a zip file 

241 with zipfile.ZipFile(zip_file_name, 'w', zipfile.ZIP_DEFLATED) as zipf: 

242 for foldername, subfolders, filenames in os.walk(directory_to_zip): 

243 for filename in filenames: 

244 file_path = os.path.join(foldername, filename) 

245 relative_path = os.path.relpath(file_path, directory_to_zip) 

246 zipf.write(file_path, relative_path) 

247 # After creating the zip, delete the original directory 

248 shutil.rmtree(directory_to_zip) 

249 #print("Just Completed zip:", zip_file_path) 

250 zip_ref.close() 

251 

252 

253 

254if __name__ == '__main__': 

255 arg_parser = ArgumentParser('jalc.py', description='''This script does the preprocessing of the initial JALC dump, in particular it splits the original zip files in smaller ones if they contain more than 100 JSON files  

256 and it modifies the dump's original structure by removing the intermediate directories and bringing it to the following structure: dump.zip -> prefixes.zip -> json files''') 

257 arg_parser.add_argument('-ja', '--jalc', dest='jalc_json_dir', required=True, help='Directory that contains the original zipped dump') 

258 arg_parser.add_argument('-out', '--output', dest='output_dir', required=True, help='Directory where the files of the original dump will be stored, and the directory will be zipped.') 

259 arg_parser.add_argument('-m', '--max_workers', required=False, default=1, type=int, help='Workers number') 

260 args = arg_parser.parse_args() 

261 jalc_json_dir = args.jalc_json_dir 

262 jalc_json_dir = normalize_path(jalc_json_dir) 

263 output_dir = args.output_dir 

264 output_dir = normalize_path(output_dir) 

265 max_workers = args.max_workers 

266 preprocessing(jalc_json_dir, output_dir, max_workers) 

267