Coverage for oc_ds_converter / lib / jsonmanager.py: 57%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it> 

3# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7from __future__ import annotations 

8 

9import os 

10import gzip 

11import os.path 

12import pathlib 

13import tarfile 

14import zipfile 

15from json import load, loads 

16from os import makedirs, sep, walk 

17from os.path import basename, exists, isdir 

18from typing import Tuple 

19 

20import zstandard as zstd 

21 

22from oc_ds_converter.lib.file_manager import init_cache 

23 

24 

25def get_all_files(is_dir_or_targz_file:str, cache_filepath:str|None=None) -> Tuple[list, tarfile.TarFile|None]: 

26 result = [] 

27 targz_fd = None 

28 cache = init_cache(cache_filepath) 

29 if isdir(is_dir_or_targz_file): 

30 for cur_dir, _, cur_files in walk(is_dir_or_targz_file): 

31 for cur_file in cur_files: 

32 if (cur_file.endswith(".json") or cur_file.endswith(".json.gz")) and not basename(cur_file).startswith(".") and basename(cur_file) not in cache: 

33 result.append(cur_dir + sep + cur_file) 

34 elif is_dir_or_targz_file.endswith("tar.gz"): 

35 targz_fd = tarfile.open(is_dir_or_targz_file, "r:gz", encoding="utf-8") 

36 for cur_file in targz_fd: 

37 if cur_file.name.endswith(".json") and not basename(cur_file.name).startswith(".") and basename(cur_file.name) not in cache: 

38 result.append(cur_file) 

39 else: 

40 print("It is not possible to process the input path.") 

41 return result, targz_fd 

42 

43def load_json(file:str|tarfile.TarInfo, targz_fd:tarfile.TarFile | None) -> dict|None: 

44 result = None 

45 if targz_fd is None: 

46 if file.endswith(".json"): # type: ignore 

47 with open(file, encoding="utf8") as f: # type: ignore 

48 result = load(f) 

49 elif file.endswith(".json.gz"): # type: ignore 

50 with gzip.open(file, 'r') as gzip_file: # type: ignore 

51 data = gzip_file.read() 

52 result = loads(data.decode('utf-8')) 

53 else: 

54 cur_tar_file = targz_fd.extractfile(file) 

55 json_str = cur_tar_file.read() # type: ignore 

56 # In Python 3.5 it seems that, for some reason, the extractfile method returns an 

57 # object 'bytes' that cannot be managed by the function 'load' in the json package. 

58 # Thus, to avoid issues, in case an object having type 'bytes' is return, it is 

59 # transformed as a string before passing it to the function 'loads'. Please note 

60 # that Python 3.9 does not show this behaviour, and it works correctly without 

61 # any transformation. 

62 if type(json_str) is bytes: 

63 json_str = json_str.decode("utf-8") 

64 result = loads(json_str) 

65 return result 

66 

67 

68def _is_container_zip(zip_path: str) -> bool: 

69 """Check if a zip contains other zips (container) vs data files (final).""" 

70 with zipfile.ZipFile(zip_path, 'r') as zf: 

71 names = zf.namelist() 

72 return any(n.endswith('.zip') for n in names) 

73 

74 

75def _collect_final_zips(zip_path: str, cache: set[str], dest_dir: str) -> list[str]: 

76 """Recursively extract container zips and collect final zips.""" 

77 result: list[str] = [] 

78 makedirs(dest_dir, exist_ok=True) 

79 

80 with zipfile.ZipFile(zip_path, 'r') as zf: 

81 zf.extractall(dest_dir) 

82 

83 for root, _, files in walk(dest_dir): 

84 for f in files: 

85 if f.endswith('.zip') and not f.startswith('.') and f not in cache: 

86 nested_path = os.path.join(root, f) 

87 if _is_container_zip(nested_path): 

88 nested_dest = nested_path.replace('.zip', '') + "_decompr_zip_dir" 

89 result.extend(_collect_final_zips(nested_path, cache, nested_dest)) 

90 else: 

91 result.append(nested_path) 

92 return result 

93 

94 

95def get_all_files_by_type(i_dir_or_compr: str, req_type: str, cache_filepath: str | None = None): 

96 result: list[str | tarfile.TarInfo] = [] 

97 targz_fd = None 

98 cache = init_cache(cache_filepath) 

99 

100 if isdir(i_dir_or_compr): 

101 for cur_dir, cur_subdir, cur_files in walk(i_dir_or_compr): 

102 for cur_file in cur_files: 

103 if cur_file.endswith(req_type) and not basename(cur_file).startswith(".") and basename(cur_file) not in cache: 

104 file_path = os.path.join(cur_dir, cur_file) 

105 if req_type == ".zip" and _is_container_zip(file_path): 

106 dest_base = file_path.replace('.zip', '') + "_decompr_zip_dir" 

107 result.extend(_collect_final_zips(file_path, cache, dest_base)) 

108 else: 

109 result.append(file_path) 

110 elif i_dir_or_compr.endswith("tar.gz"): 

111 targz_fd = tarfile.open(i_dir_or_compr, "r:gz", encoding="utf-8") 

112 for cur_file in targz_fd: 

113 if cur_file.name.endswith(req_type) and not basename(cur_file.name).startswith(".") and basename(cur_file.name) not in cache: 

114 result.append(cur_file) 

115 elif i_dir_or_compr.endswith(".tar"): 

116 dest_dir = i_dir_or_compr.replace('.tar', '') + "_decompr_zip_dir" 

117 targz_fd = tarfile.open(i_dir_or_compr, "r:*", encoding="utf-8") 

118 targz_fd.extractall(dest_dir, filter='data') 

119 for cur_dir, cur_subdir, cur_files in walk(dest_dir): 

120 for cur_file in cur_files: 

121 if cur_file.endswith(req_type) and not basename(cur_file).startswith(".") and basename(cur_file) not in cache: 

122 result.append(cur_dir + sep + cur_file) 

123 targz_fd.close() 

124 elif i_dir_or_compr.endswith("zip"): 

125 if req_type == ".zip": 

126 if _is_container_zip(i_dir_or_compr): 

127 dest_base = i_dir_or_compr.replace('.zip', '') + "_decompr_zip_dir" 

128 result.extend(_collect_final_zips(i_dir_or_compr, cache, dest_base)) 

129 elif basename(i_dir_or_compr) not in cache: 

130 result.append(i_dir_or_compr) 

131 else: 

132 dest_dir = i_dir_or_compr.replace('.zip', '') + "_decompr_zip_dir" 

133 if not exists(dest_dir): 

134 makedirs(dest_dir) 

135 with zipfile.ZipFile(i_dir_or_compr, 'r') as zip_ref: 

136 zip_ref.extractall(dest_dir) 

137 for cur_dir, cur_subdir, cur_files in walk(dest_dir): 

138 for cur_file in cur_files: 

139 if cur_file.endswith(req_type) and not basename(cur_file).startswith(".") and basename(cur_file) not in cache: 

140 result.append(cur_dir + sep + cur_file) 

141 elif i_dir_or_compr.endswith("zst"): 

142 input_file = pathlib.Path(i_dir_or_compr) 

143 dest_dir = i_dir_or_compr.split(".")[0] + "_decompr_zst_dir" 

144 with open(input_file, 'rb') as compressed: 

145 decomp = zstd.ZstdDecompressor() 

146 if not exists(dest_dir): 

147 makedirs(dest_dir) 

148 output_path = pathlib.Path(dest_dir) / input_file.stem 

149 if not exists(output_path): 

150 with open(output_path, 'wb') as destination: 

151 decomp.copy_stream(compressed, destination) 

152 for cur_dir, cur_subdir, cur_files in walk(dest_dir): 

153 for cur_file in cur_files: 

154 if cur_file.endswith(req_type) and not basename(cur_file).startswith(".") and basename(cur_file) not in cache: 

155 result.append(cur_dir + sep + cur_file) 

156 else: 

157 print("It is not possible to process the input path.", i_dir_or_compr) 

158 return result, targz_fd