Coverage for oc_meta / run / infodir / gen.py: 66%
116 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7from __future__ import annotations
9import argparse
10import multiprocessing
11import os
12import zipfile
13from concurrent.futures import ProcessPoolExecutor, as_completed
15import orjson
16from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
17from oc_ocdm.support import get_prefix, get_resource_number, get_short_name
18from rich_argparse import RichHelpFormatter
19from tqdm import tqdm
21from oc_meta.lib.file_manager import collect_zip_files
24def find_max_numbered_folder(path):
25 """
26 Trova la sottocartella con il numero più elevato in una data cartella.
27 """
28 max_number = -1
29 for folder in os.listdir(path):
30 if folder.isdigit():
31 max_number = max(max_number, int(folder))
32 return max_number
34def find_max_numbered_zip_file(folder_path: str) -> str | None:
35 """
36 Trova il file zippato con il numero più elevato prima di ".zip" all'interno di una cartella.
37 """
38 max_number = -1
39 max_zip_file: str | None = None
41 for file_name in os.listdir(folder_path):
42 if file_name.endswith(".zip"):
43 prefix, extension = os.path.splitext(file_name)
44 if prefix.isdigit():
45 number = int(prefix)
46 if number > max_number:
47 max_number = number
48 max_zip_file = file_name
49 return max_zip_file
51def process_zip_file(zip_file_path):
52 batch_updates = {}
53 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
54 first_file = zip_ref.namelist()[0]
55 with zip_ref.open(first_file) as entity_file:
56 json_data = orjson.loads(entity_file.read())
57 for graph in json_data:
58 for entity in graph['@graph']:
59 prov_entity_uri = entity['@id']
60 entity_uri = entity['@id'].split('/prov/se/')[0]
61 supplier_prefix = get_prefix(entity_uri)
62 resource_number = get_resource_number(entity_uri)
63 short_name = get_short_name(entity_uri)
64 prov_short_name = 'se'
65 counter_value = int(prov_entity_uri.split('/prov/se/')[-1])
66 batch_key = (short_name, prov_short_name)
67 if supplier_prefix not in batch_updates:
68 batch_updates[supplier_prefix] = dict()
69 if batch_key not in batch_updates[supplier_prefix]:
70 batch_updates[supplier_prefix][batch_key] = dict()
71 # Save the maximum counter value for each entity
72 if resource_number not in batch_updates[supplier_prefix][batch_key]:
73 batch_updates[supplier_prefix][batch_key][resource_number] = counter_value
74 else:
75 batch_updates[supplier_prefix][batch_key][resource_number] = max(
76 batch_updates[supplier_prefix][batch_key][resource_number], counter_value)
77 return batch_updates
79def explore_directories(root_path, redis_host, redis_port, redis_db):
80 """
81 Esplora le directory e associa a ciascun supplier prefix il numero maggiore.
82 """
83 main_folders = ["ar", "br", "ra", "re", "id"]
84 counter_handler = RedisCounterHandler(host=redis_host, port=redis_port, db=redis_db)
86 for main_folder in main_folders:
87 main_folder_path = os.path.join(root_path, main_folder)
88 if os.path.isdir(main_folder_path):
89 for supplier_prefix in os.listdir(main_folder_path):
90 supplier_path = os.path.join(main_folder_path, supplier_prefix)
91 max_folder = find_max_numbered_folder(supplier_path)
92 max_zip_file = find_max_numbered_zip_file(os.path.join(supplier_path, str(max_folder)))
93 if max_zip_file is None:
94 continue
95 zip_file_path = os.path.join(supplier_path, str(max_folder), max_zip_file)
96 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
97 first_file = zip_ref.namelist()[0]
98 with zip_ref.open(first_file) as entity_file:
99 json_data = orjson.loads(entity_file.read())
100 max_entity = -1
101 for graph in json_data:
102 for entity in graph['@graph']:
103 entity_uri = entity['@id']
104 resource_number = get_resource_number(entity_uri)
105 max_entity = max(max_entity, resource_number)
107 counter_handler.set_counter(max_entity, main_folder, supplier_prefix=supplier_prefix)
109 zip_files = collect_zip_files(root_path, only_prov=True)
111 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment
112 ctx = multiprocessing.get_context('forkserver')
113 with ProcessPoolExecutor(mp_context=ctx) as executor:
114 future_results = {executor.submit(process_zip_file, zip_file): zip_file
115 for zip_file in zip_files}
117 results = []
118 with tqdm(total=len(zip_files), desc="Processing provenance zip files") as pbar:
119 for future in as_completed(future_results):
120 zip_file = future_results[future]
121 try:
122 result = future.result()
123 results.append(result)
124 except Exception as e:
125 print(f"Error processing file {zip_file}: {e}")
126 finally:
127 pbar.update(1)
129 final_batch_updates = {}
130 with tqdm(total=len(results), desc="Merging results") as pbar:
131 for batch in results:
132 for supplier_prefix, value in batch.items():
133 if supplier_prefix not in final_batch_updates:
134 final_batch_updates[supplier_prefix] = value
135 else:
136 for batch_key, inner_value in value.items():
137 if batch_key in final_batch_updates[supplier_prefix]:
138 for identifier, counter_value in inner_value.items():
139 current_value = final_batch_updates[supplier_prefix][batch_key].get(identifier, 0)
140 final_batch_updates[supplier_prefix][batch_key][identifier] = max(current_value, counter_value)
141 else:
142 final_batch_updates[supplier_prefix][batch_key] = inner_value
143 pbar.update(1)
145 counter_handler.batch_update_counters(final_batch_updates)
147def main():
148 parser = argparse.ArgumentParser(
149 description="Esplora le directory e trova i numeri massimi.",
150 formatter_class=RichHelpFormatter,
151 )
152 parser.add_argument("directory", type=str, help="Il percorso della directory da esplorare")
153 parser.add_argument("--redis-host", type=str, default="localhost", help="L'host del server Redis")
154 parser.add_argument("--redis-port", type=int, default=6379, help="La porta del server Redis")
155 parser.add_argument("--redis-db", type=int, default=6, help="Il numero del database Redis da utilizzare")
156 args = parser.parse_args()
158 explore_directories(args.directory, args.redis_host, args.redis_port, args.redis_db)
160if __name__ == "__main__":
161 main()