Coverage for oc_meta / run / infodir / gen.py: 65%

114 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2023 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE 

16 

17from __future__ import annotations 

18 

19import argparse 

20import json 

21import os 

22import zipfile 

23from concurrent.futures import as_completed 

24 

25from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

26from oc_ocdm.support import get_prefix, get_resource_number, get_short_name 

27from pebble import ProcessPool 

28from rich_argparse import RichHelpFormatter 

29from tqdm import tqdm 

30 

31 

32def find_max_numbered_folder(path): 

33 """ 

34 Trova la sottocartella con il numero più elevato in una data cartella. 

35 """ 

36 max_number = -1 

37 for folder in os.listdir(path): 

38 if folder.isdigit(): 

39 max_number = max(max_number, int(folder)) 

40 return max_number 

41 

42def find_max_numbered_zip_file(folder_path: str) -> str | None: 

43 """ 

44 Trova il file zippato con il numero più elevato prima di ".zip" all'interno di una cartella. 

45 """ 

46 max_number = -1 

47 max_zip_file: str | None = None 

48 

49 for file_name in os.listdir(folder_path): 

50 if file_name.endswith(".zip"): 

51 prefix, extension = os.path.splitext(file_name) 

52 if prefix.isdigit(): 

53 number = int(prefix) 

54 if number > max_number: 

55 max_number = number 

56 max_zip_file = file_name 

57 return max_zip_file 

58 

59def process_zip_file(zip_file_path): 

60 batch_updates = {} 

61 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: 

62 first_file = zip_ref.namelist()[0] 

63 with zip_ref.open(first_file) as entity_file: 

64 json_data = json.load(entity_file) 

65 for graph in json_data: 

66 for entity in graph['@graph']: 

67 prov_entity_uri = entity['@id'] 

68 entity_uri = entity['@id'].split('/prov/se/')[0] 

69 supplier_prefix = get_prefix(entity_uri) 

70 resource_number = get_resource_number(entity_uri) 

71 short_name = get_short_name(entity_uri) 

72 prov_short_name = 'se' 

73 counter_value = int(prov_entity_uri.split('/prov/se/')[-1]) 

74 batch_key = (short_name, prov_short_name) 

75 if supplier_prefix not in batch_updates: 

76 batch_updates[supplier_prefix] = dict() 

77 if batch_key not in batch_updates[supplier_prefix]: 

78 batch_updates[supplier_prefix][batch_key] = dict() 

79 # Save the maximum counter value for each entity 

80 if resource_number not in batch_updates[supplier_prefix][batch_key]: 

81 batch_updates[supplier_prefix][batch_key][resource_number] = counter_value 

82 else: 

83 batch_updates[supplier_prefix][batch_key][resource_number] = max( 

84 batch_updates[supplier_prefix][batch_key][resource_number], counter_value) 

85 return batch_updates 

86 

87def explore_directories(root_path, redis_host, redis_port, redis_db): 

88 """ 

89 Esplora le directory e associa a ciascun supplier prefix il numero maggiore. 

90 """ 

91 main_folders = ["ar", "br", "ra", "re", "id"] 

92 counter_handler = RedisCounterHandler(host=redis_host, port=redis_port, db=redis_db) 

93 

94 for main_folder in main_folders: 

95 main_folder_path = os.path.join(root_path, main_folder) 

96 if os.path.isdir(main_folder_path): 

97 for supplier_prefix in os.listdir(main_folder_path): 

98 supplier_path = os.path.join(main_folder_path, supplier_prefix) 

99 max_folder = find_max_numbered_folder(supplier_path) 

100 max_zip_file = find_max_numbered_zip_file(os.path.join(supplier_path, str(max_folder))) 

101 if max_zip_file is None: 

102 continue 

103 zip_file_path = os.path.join(supplier_path, str(max_folder), max_zip_file) 

104 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: 

105 first_file = zip_ref.namelist()[0] 

106 with zip_ref.open(first_file) as entity_file: 

107 json_data = json.load(entity_file) 

108 max_entity = -1 

109 for graph in json_data: 

110 for entity in graph['@graph']: 

111 entity_uri = entity['@id'] 

112 resource_number = get_resource_number(entity_uri) 

113 max_entity = max(max_entity, resource_number) 

114 

115 counter_handler.set_counter(max_entity, main_folder, supplier_prefix=supplier_prefix) 

116 

117 zip_files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(root_path) 

118 for f in filenames if f.endswith('.zip') and 'prov' in dp] 

119 

120 with ProcessPool() as pool: 

121 future_results = {pool.schedule(process_zip_file, args=[zip_file]): zip_file 

122 for zip_file in zip_files} 

123 

124 results = [] 

125 with tqdm(total=len(zip_files), desc="Processing provenance zip files") as pbar: 

126 for future in as_completed(future_results): 

127 zip_file = future_results[future] # type: ignore[index] 

128 try: 

129 result = future.result() 

130 results.append(result) 

131 except Exception as e: 

132 print(f"Error processing file {zip_file}: {e}") 

133 finally: 

134 pbar.update(1) 

135 

136 final_batch_updates = {} 

137 with tqdm(total=len(results), desc="Merging results") as pbar: 

138 for batch in results: 

139 for supplier_prefix, value in batch.items(): 

140 if supplier_prefix not in final_batch_updates: 

141 final_batch_updates[supplier_prefix] = value 

142 else: 

143 for batch_key, inner_value in value.items(): 

144 if batch_key in final_batch_updates[supplier_prefix]: 

145 for identifier, counter_value in inner_value.items(): 

146 current_value = final_batch_updates[supplier_prefix][batch_key].get(identifier, 0) 

147 final_batch_updates[supplier_prefix][batch_key][identifier] = max(current_value, counter_value) 

148 else: 

149 final_batch_updates[supplier_prefix][batch_key] = inner_value 

150 pbar.update(1) 

151 

152 counter_handler.batch_update_counters(final_batch_updates) 

153 

154def main(): 

155 parser = argparse.ArgumentParser( 

156 description="Esplora le directory e trova i numeri massimi.", 

157 formatter_class=RichHelpFormatter, 

158 ) 

159 parser.add_argument("directory", type=str, help="Il percorso della directory da esplorare") 

160 parser.add_argument("--redis-host", type=str, default="localhost", help="L'host del server Redis") 

161 parser.add_argument("--redis-port", type=int, default=6379, help="La porta del server Redis") 

162 parser.add_argument("--redis-db", type=int, default=6, help="Il numero del database Redis da utilizzare") 

163 args = parser.parse_args() 

164 

165 explore_directories(args.directory, args.redis_host, args.redis_port, args.redis_db) 

166 

167if __name__ == "__main__": 

168 main()