Coverage for test / preprocessing_jalc_test.py: 94%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import os 

6import os.path 

7import unittest 

8import shutil 

9from os.path import sep 

10 

11from oc_ds_converter.preprocessing.jalc import preprocessing 

12import zipfile 

13 

14 

15 

16BASE = os.path.join('test', 'preprocessing_jalc') 

17JALC_DIR = os.path.join(BASE, 'ZIP_JOCI_TEST') 

18JALC_DIR = os.path.abspath(JALC_DIR) 

19 

20OUT_DIR = os.path.join(BASE, 'OUT_DIR') 

21OUT_DIR = os.path.abspath(OUT_DIR) 

22 

23 

24 

25 

26class TestJalcPreprocessing(unittest.TestCase): 

27 def test_base_decompress_and_rearrange(self): 

28 

29 if os.path.exists(OUT_DIR + '.zip'): 

30 os.remove(OUT_DIR + '.zip') 

31 

32 for el in os.listdir(JALC_DIR): 

33 if el.endswith("decompr_zip_dir"): 

34 shutil.rmtree(os.path.join(JALC_DIR, el)) 

35 

36 

37 def count_files(dir_with_dump_to_ckeck): 

38 count = 0 

39 

40 all_unzipped_files = [] 

41 for zip_lev0 in os.listdir(dir_with_dump_to_ckeck): 

42 if zip_lev0.endswith("zip") and not zip_lev0.startswith("._"): 

43 with zipfile.ZipFile(os.path.join(dir_with_dump_to_ckeck, zip_lev0), 'r') as zip_ref: 

44 dest_dir = os.path.join(dir_with_dump_to_ckeck, zip_lev0).replace('.zip', '') + "_decompr_zip_dir" 

45 if not os.path.exists(dest_dir): 

46 os.makedirs(dest_dir) 

47 zip_ref.extractall(dest_dir) 

48 print(f"Unzipped to {dest_dir}") 

49 for cur_dir, cur_subdir, cur_files in os.walk(dest_dir): 

50 for cur_file in cur_files: 

51 if not os.path.basename(cur_file).startswith("."): 

52 all_unzipped_files.append(cur_dir + sep + cur_file) 

53 

54 zip_files = [file for file in all_unzipped_files if file.endswith(".zip")] 

55 not_zip_files = [file for file in all_unzipped_files if not file.endswith(".zip")] 

56 

57 for file in not_zip_files: 

58 if os.path.isfile(file): 

59 count += 1 

60 

61 for zip_file in zip_files: 

62 zip_file_path = zip_file 

63 # Open the zip file for reading 

64 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: 

65 directories = [entry for entry in zip_ref.namelist() if entry.endswith('/')] 

66 # List the names of files within the zip (the relevant one, the json containing the citation information) 

67 file_names = [x for x in zip_ref.namelist() if 

68 x.endswith(".json") and "doiList" not in x and x not in directories] 

69 extra_file_names = [x for x in zip_ref.namelist() if x not in file_names and x not in directories] 

70 if len(file_names) > 100 and len(file_names) % 100 != 0: 

71 number_of_zip = len(file_names) // 100 + 1 

72 elif len(file_names) > 100 and len(file_names) % 100 == 0: 

73 number_of_zip = len(file_names) // 100 

74 elif len(file_names) < 100: 

75 number_of_zip = 1 

76 count += number_of_zip 

77 number_extra_files = len(extra_file_names) 

78 count += number_extra_files 

79 count += len(file_names) 

80 return count 

81 

82 expected_count = count_files(JALC_DIR) 

83 preprocessing(JALC_DIR, OUT_DIR, 5) 

84 

85 def count_elements_in_zip(zip_file): 

86 element_count = 0 

87 with zipfile.ZipFile(zip_file, 'r') as zipf: 

88 for item in zipf.infolist(): 

89 if item.is_dir(): 

90 continue 

91 else: 

92 element_count += 1 

93 if item.filename.endswith('.zip'): 

94 # Extract the nested zip file into a temporary folder 

95 nested_zip_path = os.path.join('temp', item.filename) 

96 zipf.extract(item, 'temp') 

97 # Recursively count elements in the nested zip 

98 element_count += count_elements_in_zip(nested_zip_path) 

99 # Clean up the extracted nested zip file 

100 os.remove(nested_zip_path) 

101 return element_count 

102 

103 real_count = count_elements_in_zip(OUT_DIR+".zip") 

104 self.assertEqual(expected_count, real_count) 

105 

106 os.remove(OUT_DIR + '.zip') 

107 

108 for el in os.listdir(JALC_DIR): 

109 if el.endswith("decompr_zip_dir"): 

110 shutil.rmtree(os.path.join(JALC_DIR, el)) 

111 

112 

113 

114 

115 

116 

117if __name__ == '__main__': 

118 unittest.main() 

119 

120 

121 

122 

123 

124 

125