Coverage for oc_ds_converter / preprocessing / jalc.py: 31%
160 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it>
2# SPDX-FileCopyrightText: 2023-2024 Arianna Moretti <arianna.moretti4@unibo.it>
3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7import os
8import shutil
9import zipfile
10from argparse import ArgumentParser
11from os import makedirs, sep, walk
12from os.path import basename, exists
14from concurrent.futures import ProcessPoolExecutor
15from multiprocessing import get_context
17from tqdm import tqdm
19from oc_ds_converter.lib.file_manager import normalize_path
22def preprocessing(jalc_json_dir:str, output_dir:str, max_workers:int = 1):
23 """This function preprocesses the original JALC zipped dump. The original dump has the following structure:
24 - jalc_dump.zip
25 -jalc_dataset_dir
26 -prefixes.json
27 -prefix1.zip
28 -prefix1_dir
29 -doiList1.json
30 -doi1A.json
31 -doi1B.json
32 -etc.
33 -prefix2.zip
34 -prefix2_dir
35 -doiList2.json
36 -doi2A.json
37 -etc.
38 The preprocessing in particular removes the intermediate folders (like jalc_dump, prefix1, prefix2 in the example above)
39 and if inside one prefix.zip file are found more than 100 files, the original zip file is divided into the number of
40 json files in it//100 +1. In the case in which a prefix.zip file is divided into more zip files, the doiList.json is copied
41 just in the first subfile."""
43 '''In the els_to_be_skipped list are appended all the files found in the input directory starting with
44 "._" and the zip file if the corresponding decompressed directory is found'''
45 els_to_be_skipped = []
46 input_dir_cont = os.listdir(jalc_json_dir)
47 for el in input_dir_cont:# should be one (the input dir contains 1 zip)
48 if el.startswith("._"):
49 # skip elements starting with ._
50 els_to_be_skipped.append(os.path.join(jalc_json_dir, el))
51 else:
52 if el.endswith(".zip"):
53 base_name = el.replace('.zip', '')
54 if [x for x in os.listdir(jalc_json_dir) if x.startswith(base_name) and x.endswith("decompr_zip_dir")]:
55 els_to_be_skipped.append(os.path.join(jalc_json_dir, el))
57 if not os.path.exists(output_dir):
58 os.makedirs(output_dir)
60 all_unzipped_files = []
61 '''The elements in the "els_to_be_skipped" list are now taken into account. If inside the list a zip element is found,
62 the corresponding unzipped folder (if found in the input directory) is used to list all the elements inside the original
63 zipped dump.'''
64 els_to_be_skipped_cont = [x for x in els_to_be_skipped if x.endswith(".zip")]
65 if els_to_be_skipped_cont:
66 for el_to_skip in els_to_be_skipped_cont:
67 if el_to_skip.startswith("._"):
68 continue
69 base_name_el_to_skip = os.path.basename(el_to_skip).replace('.zip', '')
70 for directory in os.listdir(jalc_json_dir):
71 if directory == base_name_el_to_skip + "_decompr_zip_dir":
72 # if el.startswith(base_name_el_to_skip) and el.endswith("decompr_zip_dir"):
73 for dir_lev2 in os.listdir(os.path.join(jalc_json_dir, directory)):
74 all_unzipped_files = [os.path.join(jalc_json_dir, directory, dir_lev2, file) for file in os.listdir(os.path.join(jalc_json_dir, directory, dir_lev2)) if not file.startswith("._")]
76 '''If there aren't elements in the "els_to_be_skipped" list all the elements in the original
77 zipped dump are extracted in a folder with the same basename of the original dump + "_decompre_zip_dir".'''
78 if len(all_unzipped_files) == 0:
79 for zip_lev0 in os.listdir(jalc_json_dir):
80 if zip_lev0.endswith("zip") and not zip_lev0.startswith("._"):
81 with zipfile.ZipFile(os.path.join(jalc_json_dir, zip_lev0), 'r') as zip_ref:
82 dest_dir = os.path.join(jalc_json_dir, zip_lev0).replace('.zip', '') + "_decompr_zip_dir"
83 if not exists(dest_dir):
84 makedirs(dest_dir)
85 zip_ref.extractall(dest_dir)
86 print(f"Unzipped to {dest_dir}")
87 for cur_dir, cur_subdir, cur_files in walk(dest_dir):
88 for cur_file in cur_files:
89 if not basename(cur_file).startswith("."):
90 all_unzipped_files.append(cur_dir + sep + cur_file)
94 # Filter for zip files (files with ".zip" extension)
95 zip_files = [file for file in all_unzipped_files if file.endswith(".zip")]
96 not_zip_files = [file for file in all_unzipped_files if not file.endswith(".zip")]
98 # copy all extra files as they are in the output dir
99 for file in not_zip_files:
100 if os.path.isfile(file):
101 shutil.copy(file, output_dir)
103 # parallelization of the process
104 if max_workers == 1:
105 for zip_file in tqdm(zip_files, desc="Processing ZIP Files"):
106 process_zip(zip_file, output_dir)
108 else:
109 with ProcessPoolExecutor(max_workers=max_workers, mp_context=get_context('spawn')) as executor:
110 for zip_file in tqdm(zip_files, desc="Processing ZIP Files"):
111 executor.submit(process_zip, zip_file, output_dir)
114 # At the end of the process, create a ZIP archive of the output_dir and rename it
115 print("Zipping the output directory")
116 shutil.make_archive(output_dir, 'zip', output_dir)
117 print("Removing output directory")
118 shutil.rmtree(output_dir) # Delete the original directory
121def process_zip(zip_file, output_dir2):
123 if not zip_file.endswith(".zip"):
124 shutil.copy(zip_file, output_dir2)
126 else:
127 # manage the resizing
128 # Full path to the zip file
129 zip_file_path = zip_file
130 first_level_path = os.path.dirname(zip_file_path)
131 print("executing", zip_file_path)
132 # Open the zip file for reading
133 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
134 all_files_in_zip = zip_ref.namelist()
136 directories = [entry for entry in all_files_in_zip if entry.endswith('/')]
137 # List the names of files within the zip (the relevant one, the json containing the citation information)
138 file_names = [x for x in all_files_in_zip if x.endswith(".json") and "doiList" not in x and x not in directories]
139 extra_file_names = [x for x in all_files_in_zip if "doiList" in x or x.endswith(".out")]
141 if len(file_names) <= 100:
142 print("less than 100")
143 zip_file_name = os.path.basename(zip_file_path).replace(".zip", "")
144 dir_to_zip = os.path.join(output_dir2, zip_file_name)
145 os.makedirs(dir_to_zip, exist_ok=True)
146 for non_zip in extra_file_names:
147 if not os.path.isdir(non_zip) and 'doiList' in non_zip:
148 '''We use os.path.basename(file_info.filename) to extract just the filename
149 without the folder structure. We create a new file in the destination folder
150 with the extracted filename and write the content of the file from the ZIP archive into it.'''
151 file_info = zip_ref.getinfo(non_zip)
152 extracted_file_path = os.path.join(dir_to_zip, os.path.basename(file_info.filename))
153 with open(extracted_file_path, 'wb') as extracted_file:
154 extracted_file.write(zip_ref.read(non_zip))
155 for fs in file_names:
156 file_info = zip_ref.getinfo(fs)
157 extracted_file_path = os.path.join(dir_to_zip,
158 os.path.basename(file_info.filename))
159 with open(extracted_file_path, 'wb') as extracted_file:
160 extracted_file.write(zip_ref.read(fs))
162 # Parent directory
163 parent_directory = os.path.dirname(dir_to_zip)
164 # Name the zip file
165 zip_file_name = dir_to_zip + ".zip"
166 # Full path to the destination zip file
167 zip_file_path = os.path.join(parent_directory, zip_file_name)
169 with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
170 for foldername, subfolders, filenames in os.walk(dir_to_zip):
171 for filename in filenames:
172 file_path = os.path.join(foldername, filename)
173 relative_path = os.path.relpath(file_path, dir_to_zip)
174 zipf.write(file_path, relative_path)
175 # After creating the zip, delete the original directory
176 shutil.rmtree(dir_to_zip)
177 #print("Just Completed zip:", zip_file_path)
179 else:
180 # if there are more than 100 jsons, they should be divided as many zips as the total number of jsons/ 100.
181 # The first of the new sub-zip files should also contain all the extra files which where stored in the original zip
182 print("more than 100: ", len(file_names))
183 zip_basename = zip_file_path.replace(".zip", "").replace(first_level_path, "").replace("/", "").replace(
184 "\\", "")
185 zip_part_counter = 0
187 while len(file_names):
188 # name of the new folder (prefix_0.zip)
189 zip_basename_w_counter = zip_basename + "_" + str(zip_part_counter)
190 # path of the new folder where files will be extracted
191 sliced_out_name = os.path.join(output_dir2, zip_basename_w_counter)
192 os.makedirs(sliced_out_name, exist_ok=True)
194 # copying in the output folder all the extra files (not json files with information on the bibliographic entities)
195 if zip_part_counter == 0:
196 for non_zip in extra_file_names:
197 if not os.path.isdir(non_zip) and 'doiList' in non_zip:
198 '''We use os.path.basename(file_info.filename) to extract just the filename
199 without the folder structure. We create a new file in the destination folder
200 with the extracted filename and write the content of the file from the ZIP archive into it.'''
201 file_info = zip_ref.getinfo(non_zip)
202 extracted_file_path = os.path.join(sliced_out_name, os.path.basename(file_info.filename))
203 with open(extracted_file_path, 'wb') as extracted_file:
204 extracted_file.write(zip_ref.read(non_zip))
206 if len(file_names) > 100:
207 print("json to be processed in this zip:", len(file_names))
209 files_slice = file_names[:100]
210 file_names = file_names[100:]
211 zip_part_counter +=1
213 for fs in files_slice:
214 file_info = zip_ref.getinfo(fs)
215 extracted_file_path = os.path.join(sliced_out_name,
216 os.path.basename(file_info.filename))
217 with open(extracted_file_path, 'wb') as extracted_file:
218 extracted_file.write(zip_ref.read(fs))
219 else:
220 print("last", len(file_names), "jsons to be processed")
221 files_slice = file_names
222 file_names = []
223 zip_part_counter += 1
225 for fs in files_slice:
226 file_info = zip_ref.getinfo(fs)
227 extracted_file_path = os.path.join(sliced_out_name,
228 os.path.basename(file_info.filename))
229 with open(extracted_file_path, 'wb') as extracted_file:
230 extracted_file.write(zip_ref.read(fs))
232 # Directory to zip and delete
233 directory_to_zip = sliced_out_name
234 # Parent directory
235 parent_directory = os.path.dirname(directory_to_zip)
236 # Name the zip file
237 zip_file_name = directory_to_zip + ".zip"
238 # Full path to the destination zip file
239 zip_file_path = os.path.join(parent_directory, zip_file_name)
240 # create a zip file
241 with zipfile.ZipFile(zip_file_name, 'w', zipfile.ZIP_DEFLATED) as zipf:
242 for foldername, subfolders, filenames in os.walk(directory_to_zip):
243 for filename in filenames:
244 file_path = os.path.join(foldername, filename)
245 relative_path = os.path.relpath(file_path, directory_to_zip)
246 zipf.write(file_path, relative_path)
247 # After creating the zip, delete the original directory
248 shutil.rmtree(directory_to_zip)
249 #print("Just Completed zip:", zip_file_path)
250 zip_ref.close()
254if __name__ == '__main__':
255 arg_parser = ArgumentParser('jalc.py', description='''This script does the preprocessing of the initial JALC dump, in particular it splits the original zip files in smaller ones if they contain more than 100 JSON files
256 and it modifies the dump's original structure by removing the intermediate directories and bringing it to the following structure: dump.zip -> prefixes.zip -> json files''')
257 arg_parser.add_argument('-ja', '--jalc', dest='jalc_json_dir', required=True, help='Directory that contains the original zipped dump')
258 arg_parser.add_argument('-out', '--output', dest='output_dir', required=True, help='Directory where the files of the original dump will be stored, and the directory will be zipped.')
259 arg_parser.add_argument('-m', '--max_workers', required=False, default=1, type=int, help='Workers number')
260 args = arg_parser.parse_args()
261 jalc_json_dir = args.jalc_json_dir
262 jalc_json_dir = normalize_path(jalc_json_dir)
263 output_dir = args.output_dir
264 output_dir = normalize_path(output_dir)
265 max_workers = args.max_workers
266 preprocessing(jalc_json_dir, output_dir, max_workers)