Coverage for oc_meta / run / infodir / gen.py: 65%
114 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2023 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE
17from __future__ import annotations
19import argparse
20import json
21import os
22import zipfile
23from concurrent.futures import as_completed
25from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
26from oc_ocdm.support import get_prefix, get_resource_number, get_short_name
27from pebble import ProcessPool
28from rich_argparse import RichHelpFormatter
29from tqdm import tqdm
32def find_max_numbered_folder(path):
33 """
34 Trova la sottocartella con il numero più elevato in una data cartella.
35 """
36 max_number = -1
37 for folder in os.listdir(path):
38 if folder.isdigit():
39 max_number = max(max_number, int(folder))
40 return max_number
42def find_max_numbered_zip_file(folder_path: str) -> str | None:
43 """
44 Trova il file zippato con il numero più elevato prima di ".zip" all'interno di una cartella.
45 """
46 max_number = -1
47 max_zip_file: str | None = None
49 for file_name in os.listdir(folder_path):
50 if file_name.endswith(".zip"):
51 prefix, extension = os.path.splitext(file_name)
52 if prefix.isdigit():
53 number = int(prefix)
54 if number > max_number:
55 max_number = number
56 max_zip_file = file_name
57 return max_zip_file
59def process_zip_file(zip_file_path):
60 batch_updates = {}
61 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
62 first_file = zip_ref.namelist()[0]
63 with zip_ref.open(first_file) as entity_file:
64 json_data = json.load(entity_file)
65 for graph in json_data:
66 for entity in graph['@graph']:
67 prov_entity_uri = entity['@id']
68 entity_uri = entity['@id'].split('/prov/se/')[0]
69 supplier_prefix = get_prefix(entity_uri)
70 resource_number = get_resource_number(entity_uri)
71 short_name = get_short_name(entity_uri)
72 prov_short_name = 'se'
73 counter_value = int(prov_entity_uri.split('/prov/se/')[-1])
74 batch_key = (short_name, prov_short_name)
75 if supplier_prefix not in batch_updates:
76 batch_updates[supplier_prefix] = dict()
77 if batch_key not in batch_updates[supplier_prefix]:
78 batch_updates[supplier_prefix][batch_key] = dict()
79 # Save the maximum counter value for each entity
80 if resource_number not in batch_updates[supplier_prefix][batch_key]:
81 batch_updates[supplier_prefix][batch_key][resource_number] = counter_value
82 else:
83 batch_updates[supplier_prefix][batch_key][resource_number] = max(
84 batch_updates[supplier_prefix][batch_key][resource_number], counter_value)
85 return batch_updates
87def explore_directories(root_path, redis_host, redis_port, redis_db):
88 """
89 Esplora le directory e associa a ciascun supplier prefix il numero maggiore.
90 """
91 main_folders = ["ar", "br", "ra", "re", "id"]
92 counter_handler = RedisCounterHandler(host=redis_host, port=redis_port, db=redis_db)
94 for main_folder in main_folders:
95 main_folder_path = os.path.join(root_path, main_folder)
96 if os.path.isdir(main_folder_path):
97 for supplier_prefix in os.listdir(main_folder_path):
98 supplier_path = os.path.join(main_folder_path, supplier_prefix)
99 max_folder = find_max_numbered_folder(supplier_path)
100 max_zip_file = find_max_numbered_zip_file(os.path.join(supplier_path, str(max_folder)))
101 if max_zip_file is None:
102 continue
103 zip_file_path = os.path.join(supplier_path, str(max_folder), max_zip_file)
104 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
105 first_file = zip_ref.namelist()[0]
106 with zip_ref.open(first_file) as entity_file:
107 json_data = json.load(entity_file)
108 max_entity = -1
109 for graph in json_data:
110 for entity in graph['@graph']:
111 entity_uri = entity['@id']
112 resource_number = get_resource_number(entity_uri)
113 max_entity = max(max_entity, resource_number)
115 counter_handler.set_counter(max_entity, main_folder, supplier_prefix=supplier_prefix)
117 zip_files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(root_path)
118 for f in filenames if f.endswith('.zip') and 'prov' in dp]
120 with ProcessPool() as pool:
121 future_results = {pool.schedule(process_zip_file, args=[zip_file]): zip_file
122 for zip_file in zip_files}
124 results = []
125 with tqdm(total=len(zip_files), desc="Processing provenance zip files") as pbar:
126 for future in as_completed(future_results):
127 zip_file = future_results[future] # type: ignore[index]
128 try:
129 result = future.result()
130 results.append(result)
131 except Exception as e:
132 print(f"Error processing file {zip_file}: {e}")
133 finally:
134 pbar.update(1)
136 final_batch_updates = {}
137 with tqdm(total=len(results), desc="Merging results") as pbar:
138 for batch in results:
139 for supplier_prefix, value in batch.items():
140 if supplier_prefix not in final_batch_updates:
141 final_batch_updates[supplier_prefix] = value
142 else:
143 for batch_key, inner_value in value.items():
144 if batch_key in final_batch_updates[supplier_prefix]:
145 for identifier, counter_value in inner_value.items():
146 current_value = final_batch_updates[supplier_prefix][batch_key].get(identifier, 0)
147 final_batch_updates[supplier_prefix][batch_key][identifier] = max(current_value, counter_value)
148 else:
149 final_batch_updates[supplier_prefix][batch_key] = inner_value
150 pbar.update(1)
152 counter_handler.batch_update_counters(final_batch_updates)
154def main():
155 parser = argparse.ArgumentParser(
156 description="Esplora le directory e trova i numeri massimi.",
157 formatter_class=RichHelpFormatter,
158 )
159 parser.add_argument("directory", type=str, help="Il percorso della directory da esplorare")
160 parser.add_argument("--redis-host", type=str, default="localhost", help="L'host del server Redis")
161 parser.add_argument("--redis-port", type=int, default=6379, help="La porta del server Redis")
162 parser.add_argument("--redis-db", type=int, default=6, help="Il numero del database Redis da utilizzare")
163 args = parser.parse_args()
165 explore_directories(args.directory, args.redis_host, args.redis_port, args.redis_db)
167if __name__ == "__main__":
168 main()