Coverage for test / preprocessing_jalc_test.py: 94%
79 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import os
6import os.path
7import unittest
8import shutil
9from os.path import sep
11from oc_ds_converter.preprocessing.jalc import preprocessing
12import zipfile
16BASE = os.path.join('test', 'preprocessing_jalc')
17JALC_DIR = os.path.join(BASE, 'ZIP_JOCI_TEST')
18JALC_DIR = os.path.abspath(JALC_DIR)
20OUT_DIR = os.path.join(BASE, 'OUT_DIR')
21OUT_DIR = os.path.abspath(OUT_DIR)
26class TestJalcPreprocessing(unittest.TestCase):
27 def test_base_decompress_and_rearrange(self):
29 if os.path.exists(OUT_DIR + '.zip'):
30 os.remove(OUT_DIR + '.zip')
32 for el in os.listdir(JALC_DIR):
33 if el.endswith("decompr_zip_dir"):
34 shutil.rmtree(os.path.join(JALC_DIR, el))
37 def count_files(dir_with_dump_to_ckeck):
38 count = 0
40 all_unzipped_files = []
41 for zip_lev0 in os.listdir(dir_with_dump_to_ckeck):
42 if zip_lev0.endswith("zip") and not zip_lev0.startswith("._"):
43 with zipfile.ZipFile(os.path.join(dir_with_dump_to_ckeck, zip_lev0), 'r') as zip_ref:
44 dest_dir = os.path.join(dir_with_dump_to_ckeck, zip_lev0).replace('.zip', '') + "_decompr_zip_dir"
45 if not os.path.exists(dest_dir):
46 os.makedirs(dest_dir)
47 zip_ref.extractall(dest_dir)
48 print(f"Unzipped to {dest_dir}")
49 for cur_dir, cur_subdir, cur_files in os.walk(dest_dir):
50 for cur_file in cur_files:
51 if not os.path.basename(cur_file).startswith("."):
52 all_unzipped_files.append(cur_dir + sep + cur_file)
54 zip_files = [file for file in all_unzipped_files if file.endswith(".zip")]
55 not_zip_files = [file for file in all_unzipped_files if not file.endswith(".zip")]
57 for file in not_zip_files:
58 if os.path.isfile(file):
59 count += 1
61 for zip_file in zip_files:
62 zip_file_path = zip_file
63 # Open the zip file for reading
64 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
65 directories = [entry for entry in zip_ref.namelist() if entry.endswith('/')]
66 # List the names of files within the zip (the relevant one, the json containing the citation information)
67 file_names = [x for x in zip_ref.namelist() if
68 x.endswith(".json") and "doiList" not in x and x not in directories]
69 extra_file_names = [x for x in zip_ref.namelist() if x not in file_names and x not in directories]
70 if len(file_names) > 100 and len(file_names) % 100 != 0:
71 number_of_zip = len(file_names) // 100 + 1
72 elif len(file_names) > 100 and len(file_names) % 100 == 0:
73 number_of_zip = len(file_names) // 100
74 elif len(file_names) < 100:
75 number_of_zip = 1
76 count += number_of_zip
77 number_extra_files = len(extra_file_names)
78 count += number_extra_files
79 count += len(file_names)
80 return count
82 expected_count = count_files(JALC_DIR)
83 preprocessing(JALC_DIR, OUT_DIR, 5)
85 def count_elements_in_zip(zip_file):
86 element_count = 0
87 with zipfile.ZipFile(zip_file, 'r') as zipf:
88 for item in zipf.infolist():
89 if item.is_dir():
90 continue
91 else:
92 element_count += 1
93 if item.filename.endswith('.zip'):
94 # Extract the nested zip file into a temporary folder
95 nested_zip_path = os.path.join('temp', item.filename)
96 zipf.extract(item, 'temp')
97 # Recursively count elements in the nested zip
98 element_count += count_elements_in_zip(nested_zip_path)
99 # Clean up the extracted nested zip file
100 os.remove(nested_zip_path)
101 return element_count
103 real_count = count_elements_in_zip(OUT_DIR+".zip")
104 self.assertEqual(expected_count, real_count)
106 os.remove(OUT_DIR + '.zip')
108 for el in os.listdir(JALC_DIR):
109 if el.endswith("decompr_zip_dir"):
110 shutil.rmtree(os.path.join(JALC_DIR, el))
117if __name__ == '__main__':
118 unittest.main()