Coverage for oc_ds_converter / lib / jsonmanager.py: 57%
123 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it>
3# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7from __future__ import annotations
9import os
10import gzip
11import os.path
12import pathlib
13import tarfile
14import zipfile
15from json import load, loads
16from os import makedirs, sep, walk
17from os.path import basename, exists, isdir
18from typing import Tuple
20import zstandard as zstd
22from oc_ds_converter.lib.file_manager import init_cache
25def get_all_files(is_dir_or_targz_file:str, cache_filepath:str|None=None) -> Tuple[list, tarfile.TarFile|None]:
26 result = []
27 targz_fd = None
28 cache = init_cache(cache_filepath)
29 if isdir(is_dir_or_targz_file):
30 for cur_dir, _, cur_files in walk(is_dir_or_targz_file):
31 for cur_file in cur_files:
32 if (cur_file.endswith(".json") or cur_file.endswith(".json.gz")) and not basename(cur_file).startswith(".") and basename(cur_file) not in cache:
33 result.append(cur_dir + sep + cur_file)
34 elif is_dir_or_targz_file.endswith("tar.gz"):
35 targz_fd = tarfile.open(is_dir_or_targz_file, "r:gz", encoding="utf-8")
36 for cur_file in targz_fd:
37 if cur_file.name.endswith(".json") and not basename(cur_file.name).startswith(".") and basename(cur_file.name) not in cache:
38 result.append(cur_file)
39 else:
40 print("It is not possible to process the input path.")
41 return result, targz_fd
43def load_json(file:str|tarfile.TarInfo, targz_fd:tarfile.TarFile | None) -> dict|None:
44 result = None
45 if targz_fd is None:
46 if file.endswith(".json"): # type: ignore
47 with open(file, encoding="utf8") as f: # type: ignore
48 result = load(f)
49 elif file.endswith(".json.gz"): # type: ignore
50 with gzip.open(file, 'r') as gzip_file: # type: ignore
51 data = gzip_file.read()
52 result = loads(data.decode('utf-8'))
53 else:
54 cur_tar_file = targz_fd.extractfile(file)
55 json_str = cur_tar_file.read() # type: ignore
56 # In Python 3.5 it seems that, for some reason, the extractfile method returns an
57 # object 'bytes' that cannot be managed by the function 'load' in the json package.
58 # Thus, to avoid issues, in case an object having type 'bytes' is return, it is
59 # transformed as a string before passing it to the function 'loads'. Please note
60 # that Python 3.9 does not show this behaviour, and it works correctly without
61 # any transformation.
62 if type(json_str) is bytes:
63 json_str = json_str.decode("utf-8")
64 result = loads(json_str)
65 return result
68def _is_container_zip(zip_path: str) -> bool:
69 """Check if a zip contains other zips (container) vs data files (final)."""
70 with zipfile.ZipFile(zip_path, 'r') as zf:
71 names = zf.namelist()
72 return any(n.endswith('.zip') for n in names)
75def _collect_final_zips(zip_path: str, cache: set[str], dest_dir: str) -> list[str]:
76 """Recursively extract container zips and collect final zips."""
77 result: list[str] = []
78 makedirs(dest_dir, exist_ok=True)
80 with zipfile.ZipFile(zip_path, 'r') as zf:
81 zf.extractall(dest_dir)
83 for root, _, files in walk(dest_dir):
84 for f in files:
85 if f.endswith('.zip') and not f.startswith('.') and f not in cache:
86 nested_path = os.path.join(root, f)
87 if _is_container_zip(nested_path):
88 nested_dest = nested_path.replace('.zip', '') + "_decompr_zip_dir"
89 result.extend(_collect_final_zips(nested_path, cache, nested_dest))
90 else:
91 result.append(nested_path)
92 return result
95def get_all_files_by_type(i_dir_or_compr: str, req_type: str, cache_filepath: str | None = None):
96 result: list[str | tarfile.TarInfo] = []
97 targz_fd = None
98 cache = init_cache(cache_filepath)
100 if isdir(i_dir_or_compr):
101 for cur_dir, cur_subdir, cur_files in walk(i_dir_or_compr):
102 for cur_file in cur_files:
103 if cur_file.endswith(req_type) and not basename(cur_file).startswith(".") and basename(cur_file) not in cache:
104 file_path = os.path.join(cur_dir, cur_file)
105 if req_type == ".zip" and _is_container_zip(file_path):
106 dest_base = file_path.replace('.zip', '') + "_decompr_zip_dir"
107 result.extend(_collect_final_zips(file_path, cache, dest_base))
108 else:
109 result.append(file_path)
110 elif i_dir_or_compr.endswith("tar.gz"):
111 targz_fd = tarfile.open(i_dir_or_compr, "r:gz", encoding="utf-8")
112 for cur_file in targz_fd:
113 if cur_file.name.endswith(req_type) and not basename(cur_file.name).startswith(".") and basename(cur_file.name) not in cache:
114 result.append(cur_file)
115 elif i_dir_or_compr.endswith(".tar"):
116 dest_dir = i_dir_or_compr.replace('.tar', '') + "_decompr_zip_dir"
117 targz_fd = tarfile.open(i_dir_or_compr, "r:*", encoding="utf-8")
118 targz_fd.extractall(dest_dir, filter='data')
119 for cur_dir, cur_subdir, cur_files in walk(dest_dir):
120 for cur_file in cur_files:
121 if cur_file.endswith(req_type) and not basename(cur_file).startswith(".") and basename(cur_file) not in cache:
122 result.append(cur_dir + sep + cur_file)
123 targz_fd.close()
124 elif i_dir_or_compr.endswith("zip"):
125 if req_type == ".zip":
126 if _is_container_zip(i_dir_or_compr):
127 dest_base = i_dir_or_compr.replace('.zip', '') + "_decompr_zip_dir"
128 result.extend(_collect_final_zips(i_dir_or_compr, cache, dest_base))
129 elif basename(i_dir_or_compr) not in cache:
130 result.append(i_dir_or_compr)
131 else:
132 dest_dir = i_dir_or_compr.replace('.zip', '') + "_decompr_zip_dir"
133 if not exists(dest_dir):
134 makedirs(dest_dir)
135 with zipfile.ZipFile(i_dir_or_compr, 'r') as zip_ref:
136 zip_ref.extractall(dest_dir)
137 for cur_dir, cur_subdir, cur_files in walk(dest_dir):
138 for cur_file in cur_files:
139 if cur_file.endswith(req_type) and not basename(cur_file).startswith(".") and basename(cur_file) not in cache:
140 result.append(cur_dir + sep + cur_file)
141 elif i_dir_or_compr.endswith("zst"):
142 input_file = pathlib.Path(i_dir_or_compr)
143 dest_dir = i_dir_or_compr.split(".")[0] + "_decompr_zst_dir"
144 with open(input_file, 'rb') as compressed:
145 decomp = zstd.ZstdDecompressor()
146 if not exists(dest_dir):
147 makedirs(dest_dir)
148 output_path = pathlib.Path(dest_dir) / input_file.stem
149 if not exists(output_path):
150 with open(output_path, 'wb') as destination:
151 decomp.copy_stream(compressed, destination)
152 for cur_dir, cur_subdir, cur_files in walk(dest_dir):
153 for cur_file in cur_files:
154 if cur_file.endswith(req_type) and not basename(cur_file).startswith(".") and basename(cur_file) not in cache:
155 result.append(cur_dir + sep + cur_file)
156 else:
157 print("It is not possible to process the input path.", i_dir_or_compr)
158 return result, targz_fd