Coverage for oc_meta / run / infodir / check.py: 0%

53 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import zipfile 

7import multiprocessing 

8 

9from oc_ocdm.support import get_prefix, get_resource_number, get_short_name 

10from rdflib import Dataset, URIRef 

11from rdflib.namespace import PROV, RDF 

12from redis import Redis 

13from rich_argparse import RichHelpFormatter 

14from tqdm import tqdm 

15 

16from oc_meta.lib.file_manager import collect_zip_files 

17 

18 

19def process_zip_file(args): 

20 zip_file, redis_host, redis_port, redis_db = args 

21 redis_client = Redis(host=redis_host, port=redis_port, db=redis_db) 

22 missing_entities = [] 

23 

24 with zipfile.ZipFile(zip_file, 'r') as zip_ref: 

25 for file_name in zip_ref.namelist(): 

26 with zip_ref.open(file_name) as entity_file: 

27 g = Dataset(default_union=True) 

28 g.parse(data=entity_file.read(), format='json-ld') 

29 

30 for s, p, o in g.triples((None, RDF.type, PROV.Entity)): 

31 prov_entity_uri = str(s) 

32 entity_uri = prov_entity_uri.split('/prov/se/')[0] 

33 entity_uri_ref = URIRef(entity_uri) 

34 supplier_prefix = get_prefix(entity_uri_ref) 

35 short_name = get_short_name(entity_uri_ref) 

36 resource_number = get_resource_number(entity_uri_ref) 

37 

38 expected_key = f"{short_name}:{supplier_prefix}:{resource_number}:se" 

39 

40 if not redis_client.exists(expected_key): 

41 print("\nEntità mancante trovata:") 

42 print(f"URI: {entity_uri}") 

43 print(f"Prov URI: {prov_entity_uri}") 

44 print(f"Chiave Redis attesa: {expected_key}") 

45 print("---") 

46 

47 missing_entities.append({ 

48 "URI": entity_uri, 

49 "Prov URI": prov_entity_uri, 

50 "Chiave Redis attesa": expected_key 

51 }) 

52 

53 return missing_entities 

54 

55def explore_provenance_files(root_path, redis_host, redis_port, redis_db): 

56 prov_zip_files = collect_zip_files(root_path, only_prov=True) 

57 

58 args_list = [(zip_file, redis_host, redis_port, redis_db) for zip_file in prov_zip_files] 

59 

60 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment 

61 ctx = multiprocessing.get_context('forkserver') 

62 with ctx.Pool(processes=multiprocessing.cpu_count()) as pool: 

63 results = list(tqdm(pool.imap(process_zip_file, args_list), total=len(args_list), desc="Processing provenance zip files")) 

64 

65 all_missing_entities = [item for sublist in results for item in sublist] 

66 

67 print(f"\nTotale entità mancanti trovate: {len(all_missing_entities)}") 

68 

69def main(): 

70 parser = argparse.ArgumentParser( 

71 description="Verifica la presenza di entità di provenance in Redis.", 

72 formatter_class=RichHelpFormatter, 

73 ) 

74 parser.add_argument("directory", type=str, help="Il percorso della directory da esplorare") 

75 parser.add_argument("--redis-host", type=str, default="localhost", help="L'host del server Redis") 

76 parser.add_argument("--redis-port", type=int, default=6379, help="La porta del server Redis") 

77 parser.add_argument("--redis-db", type=int, default=6, help="Il numero del database Redis da utilizzare") 

78 args = parser.parse_args() 

79 

80 explore_provenance_files(args.directory, args.redis_host, args.redis_port, args.redis_db) 

81 

82if __name__ == "__main__": 

83 main()