Coverage for oc_ds_converter / preprocessing / base.py: 36%
74 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import os.path
6import pathlib
7import tarfile
8import zipfile
9from abc import ABCMeta, abstractmethod
10from json import load, loads
11from os import makedirs, sep, walk
12from os.path import basename, exists, isdir
14import zstandard as zstd
17class Preprocessing(metaclass=ABCMeta):
18 """This is the interface for implementing preprocessors for specific datasources.
19 It provides the signatures of the methods for preprocessing a dump"""
21 def __init__(self, **params):
22 """preprocessor constructor."""
23 for key in params:
24 setattr(self, key, params[key])
26 def get_all_files(self, i_dir_or_compr, req_type):
27 result = []
28 targz_fd = None
30 if isdir(i_dir_or_compr):
32 for cur_dir, cur_subdir, cur_files in walk(i_dir_or_compr):
33 for cur_file in cur_files:
34 if cur_file.endswith(req_type) and not basename(cur_file).startswith("."):
35 result.append(os.path.join(cur_dir, cur_file))
36 elif i_dir_or_compr.endswith("tar.gz"):
37 targz_fd = tarfile.open(i_dir_or_compr, "r:gz", encoding="utf-8")
38 for cur_file in targz_fd:
39 if cur_file.name.endswith(req_type) and not basename(cur_file.name).startswith("."):
40 result.append(cur_file)
41 elif i_dir_or_compr.endswith("zip"):
42 with zipfile.ZipFile(i_dir_or_compr, 'r') as zip_ref:
43 dest_dir = i_dir_or_compr + "decompr_zip_dir"
44 if not exists(dest_dir):
45 makedirs(dest_dir)
46 zip_ref.extractall(dest_dir)
47 for cur_dir, cur_subdir, cur_files in walk(dest_dir):
48 for cur_file in cur_files:
49 if cur_file.endswith(req_type) and not basename(cur_file).startswith("."):
50 result.append(cur_dir + sep + cur_file)
52 elif i_dir_or_compr.endswith("zst"):
53 input_file = pathlib.Path(i_dir_or_compr)
54 dest_dir = i_dir_or_compr + "decompr_zst_dir"
55 with open(input_file, 'rb') as compressed:
56 decomp = zstd.ZstdDecompressor()
57 if not exists(dest_dir):
58 makedirs(dest_dir)
59 output_path = pathlib.Path(dest_dir) / input_file.stem
60 if not exists(output_path):
61 with open(output_path, 'wb') as destination:
62 decomp.copy_stream(compressed, destination)
63 for cur_dir, cur_subdir, cur_files in walk(dest_dir):
64 for cur_file in cur_files:
65 if cur_file.endswith(req_type) and not basename(cur_file).startswith("."):
66 result.append(cur_dir + sep + cur_file)
67 else:
68 print("It is not possible to process the input path.")
69 return result, targz_fd
71 def load_json(self, file, targz_fd, file_idx, len_all_files):
72 result = None
74 if targz_fd is None:
75 print("Open file %s of %s" % (file_idx, len_all_files))
76 with open(file, encoding="utf8") as f:
77 result = load(f)
79 else:
80 print("Open file %s of %s (in tar.gz archive)" % (file_idx, len_all_files))
81 cur_tar_file = targz_fd.extractfile(file)
82 json_str = cur_tar_file.read()
84 # In Python 3.5 it seems that, for some reason, the extractfile method returns an
85 # object 'bytes' that cannot be managed by the function 'load' in the json package.
86 # Thus, to avoid issues, in case an object having type 'bytes' is return, it is
87 # transformed as a string before passing it to the function 'loads'. Please note
88 # that Python 3.9 does not show this behaviour, and it works correctly without
89 # any transformation.
90 if type(json_str) is bytes:
91 json_str = json_str.decode("utf-8")
93 result = loads(json_str)
95 return result
97 @abstractmethod
98 def split_input(self):
99 """ ...
100 """
101 pass
103 def filter(self, data):
104 pass
106 @abstractmethod
107 def splitted_to_file(self, cur_n, target_n, out_dir, data, headers=None):
108 pass