Coverage for oc_meta / run / infodir / check.py: 0%
53 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import zipfile
7import multiprocessing
9from oc_ocdm.support import get_prefix, get_resource_number, get_short_name
10from rdflib import Dataset, URIRef
11from rdflib.namespace import PROV, RDF
12from redis import Redis
13from rich_argparse import RichHelpFormatter
14from tqdm import tqdm
16from oc_meta.lib.file_manager import collect_zip_files
19def process_zip_file(args):
20 zip_file, redis_host, redis_port, redis_db = args
21 redis_client = Redis(host=redis_host, port=redis_port, db=redis_db)
22 missing_entities = []
24 with zipfile.ZipFile(zip_file, 'r') as zip_ref:
25 for file_name in zip_ref.namelist():
26 with zip_ref.open(file_name) as entity_file:
27 g = Dataset(default_union=True)
28 g.parse(data=entity_file.read(), format='json-ld')
30 for s, p, o in g.triples((None, RDF.type, PROV.Entity)):
31 prov_entity_uri = str(s)
32 entity_uri = prov_entity_uri.split('/prov/se/')[0]
33 entity_uri_ref = URIRef(entity_uri)
34 supplier_prefix = get_prefix(entity_uri_ref)
35 short_name = get_short_name(entity_uri_ref)
36 resource_number = get_resource_number(entity_uri_ref)
38 expected_key = f"{short_name}:{supplier_prefix}:{resource_number}:se"
40 if not redis_client.exists(expected_key):
41 print("\nEntità mancante trovata:")
42 print(f"URI: {entity_uri}")
43 print(f"Prov URI: {prov_entity_uri}")
44 print(f"Chiave Redis attesa: {expected_key}")
45 print("---")
47 missing_entities.append({
48 "URI": entity_uri,
49 "Prov URI": prov_entity_uri,
50 "Chiave Redis attesa": expected_key
51 })
53 return missing_entities
55def explore_provenance_files(root_path, redis_host, redis_port, redis_db):
56 prov_zip_files = collect_zip_files(root_path, only_prov=True)
58 args_list = [(zip_file, redis_host, redis_port, redis_db) for zip_file in prov_zip_files]
60 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment
61 ctx = multiprocessing.get_context('forkserver')
62 with ctx.Pool(processes=multiprocessing.cpu_count()) as pool:
63 results = list(tqdm(pool.imap(process_zip_file, args_list), total=len(args_list), desc="Processing provenance zip files"))
65 all_missing_entities = [item for sublist in results for item in sublist]
67 print(f"\nTotale entità mancanti trovate: {len(all_missing_entities)}")
69def main():
70 parser = argparse.ArgumentParser(
71 description="Verifica la presenza di entità di provenance in Redis.",
72 formatter_class=RichHelpFormatter,
73 )
74 parser.add_argument("directory", type=str, help="Il percorso della directory da esplorare")
75 parser.add_argument("--redis-host", type=str, default="localhost", help="L'host del server Redis")
76 parser.add_argument("--redis-port", type=int, default=6379, help="La porta del server Redis")
77 parser.add_argument("--redis-db", type=int, default=6, help="Il numero del database Redis da utilizzare")
78 args = parser.parse_args()
80 explore_provenance_files(args.directory, args.redis_host, args.redis_port, args.redis_db)
82if __name__ == "__main__":
83 main()