Coverage for oc_ds_converter / preprocessing / base.py: 36%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import os.path 

6import pathlib 

7import tarfile 

8import zipfile 

9from abc import ABCMeta, abstractmethod 

10from json import load, loads 

11from os import makedirs, sep, walk 

12from os.path import basename, exists, isdir 

13 

14import zstandard as zstd 

15 

16 

17class Preprocessing(metaclass=ABCMeta): 

18 """This is the interface for implementing preprocessors for specific datasources. 

19 It provides the signatures of the methods for preprocessing a dump""" 

20 

21 def __init__(self, **params): 

22 """preprocessor constructor.""" 

23 for key in params: 

24 setattr(self, key, params[key]) 

25 

26 def get_all_files(self, i_dir_or_compr, req_type): 

27 result = [] 

28 targz_fd = None 

29 

30 if isdir(i_dir_or_compr): 

31 

32 for cur_dir, cur_subdir, cur_files in walk(i_dir_or_compr): 

33 for cur_file in cur_files: 

34 if cur_file.endswith(req_type) and not basename(cur_file).startswith("."): 

35 result.append(os.path.join(cur_dir, cur_file)) 

36 elif i_dir_or_compr.endswith("tar.gz"): 

37 targz_fd = tarfile.open(i_dir_or_compr, "r:gz", encoding="utf-8") 

38 for cur_file in targz_fd: 

39 if cur_file.name.endswith(req_type) and not basename(cur_file.name).startswith("."): 

40 result.append(cur_file) 

41 elif i_dir_or_compr.endswith("zip"): 

42 with zipfile.ZipFile(i_dir_or_compr, 'r') as zip_ref: 

43 dest_dir = i_dir_or_compr + "decompr_zip_dir" 

44 if not exists(dest_dir): 

45 makedirs(dest_dir) 

46 zip_ref.extractall(dest_dir) 

47 for cur_dir, cur_subdir, cur_files in walk(dest_dir): 

48 for cur_file in cur_files: 

49 if cur_file.endswith(req_type) and not basename(cur_file).startswith("."): 

50 result.append(cur_dir + sep + cur_file) 

51 

52 elif i_dir_or_compr.endswith("zst"): 

53 input_file = pathlib.Path(i_dir_or_compr) 

54 dest_dir = i_dir_or_compr + "decompr_zst_dir" 

55 with open(input_file, 'rb') as compressed: 

56 decomp = zstd.ZstdDecompressor() 

57 if not exists(dest_dir): 

58 makedirs(dest_dir) 

59 output_path = pathlib.Path(dest_dir) / input_file.stem 

60 if not exists(output_path): 

61 with open(output_path, 'wb') as destination: 

62 decomp.copy_stream(compressed, destination) 

63 for cur_dir, cur_subdir, cur_files in walk(dest_dir): 

64 for cur_file in cur_files: 

65 if cur_file.endswith(req_type) and not basename(cur_file).startswith("."): 

66 result.append(cur_dir + sep + cur_file) 

67 else: 

68 print("It is not possible to process the input path.") 

69 return result, targz_fd 

70 

71 def load_json(self, file, targz_fd, file_idx, len_all_files): 

72 result = None 

73 

74 if targz_fd is None: 

75 print("Open file %s of %s" % (file_idx, len_all_files)) 

76 with open(file, encoding="utf8") as f: 

77 result = load(f) 

78 

79 else: 

80 print("Open file %s of %s (in tar.gz archive)" % (file_idx, len_all_files)) 

81 cur_tar_file = targz_fd.extractfile(file) 

82 json_str = cur_tar_file.read() 

83 

84 # In Python 3.5 it seems that, for some reason, the extractfile method returns an 

85 # object 'bytes' that cannot be managed by the function 'load' in the json package. 

86 # Thus, to avoid issues, in case an object having type 'bytes' is return, it is 

87 # transformed as a string before passing it to the function 'loads'. Please note 

88 # that Python 3.9 does not show this behaviour, and it works correctly without 

89 # any transformation. 

90 if type(json_str) is bytes: 

91 json_str = json_str.decode("utf-8") 

92 

93 result = loads(json_str) 

94 

95 return result 

96 

97 @abstractmethod 

98 def split_input(self): 

99 """ ... 

100 """ 

101 pass 

102 

103 def filter(self, data): 

104 pass 

105 

106 @abstractmethod 

107 def splitted_to_file(self, cur_n, target_n, out_dir, data, headers=None): 

108 pass