Coverage for oc_meta / run / infodir / gen.py: 66%

116 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7from __future__ import annotations 

8 

9import argparse 

10import multiprocessing 

11import os 

12import zipfile 

13from concurrent.futures import ProcessPoolExecutor, as_completed 

14 

15import orjson 

16from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

17from oc_ocdm.support import get_prefix, get_resource_number, get_short_name 

18from rich_argparse import RichHelpFormatter 

19from tqdm import tqdm 

20 

21from oc_meta.lib.file_manager import collect_zip_files 

22 

23 

24def find_max_numbered_folder(path): 

25 """ 

26 Trova la sottocartella con il numero più elevato in una data cartella. 

27 """ 

28 max_number = -1 

29 for folder in os.listdir(path): 

30 if folder.isdigit(): 

31 max_number = max(max_number, int(folder)) 

32 return max_number 

33 

34def find_max_numbered_zip_file(folder_path: str) -> str | None: 

35 """ 

36 Trova il file zippato con il numero più elevato prima di ".zip" all'interno di una cartella. 

37 """ 

38 max_number = -1 

39 max_zip_file: str | None = None 

40 

41 for file_name in os.listdir(folder_path): 

42 if file_name.endswith(".zip"): 

43 prefix, extension = os.path.splitext(file_name) 

44 if prefix.isdigit(): 

45 number = int(prefix) 

46 if number > max_number: 

47 max_number = number 

48 max_zip_file = file_name 

49 return max_zip_file 

50 

51def process_zip_file(zip_file_path): 

52 batch_updates = {} 

53 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: 

54 first_file = zip_ref.namelist()[0] 

55 with zip_ref.open(first_file) as entity_file: 

56 json_data = orjson.loads(entity_file.read()) 

57 for graph in json_data: 

58 for entity in graph['@graph']: 

59 prov_entity_uri = entity['@id'] 

60 entity_uri = entity['@id'].split('/prov/se/')[0] 

61 supplier_prefix = get_prefix(entity_uri) 

62 resource_number = get_resource_number(entity_uri) 

63 short_name = get_short_name(entity_uri) 

64 prov_short_name = 'se' 

65 counter_value = int(prov_entity_uri.split('/prov/se/')[-1]) 

66 batch_key = (short_name, prov_short_name) 

67 if supplier_prefix not in batch_updates: 

68 batch_updates[supplier_prefix] = dict() 

69 if batch_key not in batch_updates[supplier_prefix]: 

70 batch_updates[supplier_prefix][batch_key] = dict() 

71 # Save the maximum counter value for each entity 

72 if resource_number not in batch_updates[supplier_prefix][batch_key]: 

73 batch_updates[supplier_prefix][batch_key][resource_number] = counter_value 

74 else: 

75 batch_updates[supplier_prefix][batch_key][resource_number] = max( 

76 batch_updates[supplier_prefix][batch_key][resource_number], counter_value) 

77 return batch_updates 

78 

79def explore_directories(root_path, redis_host, redis_port, redis_db): 

80 """ 

81 Esplora le directory e associa a ciascun supplier prefix il numero maggiore. 

82 """ 

83 main_folders = ["ar", "br", "ra", "re", "id"] 

84 counter_handler = RedisCounterHandler(host=redis_host, port=redis_port, db=redis_db) 

85 

86 for main_folder in main_folders: 

87 main_folder_path = os.path.join(root_path, main_folder) 

88 if os.path.isdir(main_folder_path): 

89 for supplier_prefix in os.listdir(main_folder_path): 

90 supplier_path = os.path.join(main_folder_path, supplier_prefix) 

91 max_folder = find_max_numbered_folder(supplier_path) 

92 max_zip_file = find_max_numbered_zip_file(os.path.join(supplier_path, str(max_folder))) 

93 if max_zip_file is None: 

94 continue 

95 zip_file_path = os.path.join(supplier_path, str(max_folder), max_zip_file) 

96 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: 

97 first_file = zip_ref.namelist()[0] 

98 with zip_ref.open(first_file) as entity_file: 

99 json_data = orjson.loads(entity_file.read()) 

100 max_entity = -1 

101 for graph in json_data: 

102 for entity in graph['@graph']: 

103 entity_uri = entity['@id'] 

104 resource_number = get_resource_number(entity_uri) 

105 max_entity = max(max_entity, resource_number) 

106 

107 counter_handler.set_counter(max_entity, main_folder, supplier_prefix=supplier_prefix) 

108 

109 zip_files = collect_zip_files(root_path, only_prov=True) 

110 

111 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment 

112 ctx = multiprocessing.get_context('forkserver') 

113 with ProcessPoolExecutor(mp_context=ctx) as executor: 

114 future_results = {executor.submit(process_zip_file, zip_file): zip_file 

115 for zip_file in zip_files} 

116 

117 results = [] 

118 with tqdm(total=len(zip_files), desc="Processing provenance zip files") as pbar: 

119 for future in as_completed(future_results): 

120 zip_file = future_results[future] 

121 try: 

122 result = future.result() 

123 results.append(result) 

124 except Exception as e: 

125 print(f"Error processing file {zip_file}: {e}") 

126 finally: 

127 pbar.update(1) 

128 

129 final_batch_updates = {} 

130 with tqdm(total=len(results), desc="Merging results") as pbar: 

131 for batch in results: 

132 for supplier_prefix, value in batch.items(): 

133 if supplier_prefix not in final_batch_updates: 

134 final_batch_updates[supplier_prefix] = value 

135 else: 

136 for batch_key, inner_value in value.items(): 

137 if batch_key in final_batch_updates[supplier_prefix]: 

138 for identifier, counter_value in inner_value.items(): 

139 current_value = final_batch_updates[supplier_prefix][batch_key].get(identifier, 0) 

140 final_batch_updates[supplier_prefix][batch_key][identifier] = max(current_value, counter_value) 

141 else: 

142 final_batch_updates[supplier_prefix][batch_key] = inner_value 

143 pbar.update(1) 

144 

145 counter_handler.batch_update_counters(final_batch_updates) 

146 

147def main(): 

148 parser = argparse.ArgumentParser( 

149 description="Esplora le directory e trova i numeri massimi.", 

150 formatter_class=RichHelpFormatter, 

151 ) 

152 parser.add_argument("directory", type=str, help="Il percorso della directory da esplorare") 

153 parser.add_argument("--redis-host", type=str, default="localhost", help="L'host del server Redis") 

154 parser.add_argument("--redis-port", type=int, default=6379, help="La porta del server Redis") 

155 parser.add_argument("--redis-db", type=int, default=6, help="Il numero del database Redis da utilizzare") 

156 args = parser.parse_args() 

157 

158 explore_directories(args.directory, args.redis_host, args.redis_port, args.redis_db) 

159 

160if __name__ == "__main__": 

161 main()