Coverage for oc_meta / run / find / duplicated_entities.py: 0%
132 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1import argparse
2import csv
3import json
4import logging
5import os
6import zipfile
8from rich_argparse import RichHelpFormatter
9from tqdm import tqdm
11logging.basicConfig(filename='error_log_find_duplicated_resources.txt', level=logging.ERROR,
12 format='%(asctime)s - %(levelname)s - %(message)s')
14class UnionFind:
15 def __init__(self):
16 self.parent = {}
17 self.rank = {}
19 def find(self, item):
20 if item not in self.parent:
21 self.parent[item] = item
22 self.rank[item] = 0
23 return item
25 if self.parent[item] != item:
26 self.parent[item] = self.find(self.parent[item])
27 return self.parent[item]
29 def union(self, x, y):
30 xroot = self.find(x)
31 yroot = self.find(y)
33 if xroot == yroot:
34 return
36 if self.rank[xroot] < self.rank[yroot]:
37 self.parent[xroot] = yroot
38 elif self.rank[xroot] > self.rank[yroot]:
39 self.parent[yroot] = xroot
40 else:
41 self.parent[yroot] = xroot
42 self.rank[xroot] += 1
44def read_and_analyze_zip_files(folder_path, csv_path, resource_type):
45 resources = {}
47 if resource_type in ['br', 'both']:
48 br_folder_path = os.path.join(folder_path, 'br')
49 process_folder(br_folder_path, resources, 'br')
51 if resource_type in ['ra', 'both']:
52 ra_folder_path = os.path.join(folder_path, 'ra')
53 process_folder(ra_folder_path, resources, 'ra')
55 save_duplicates_to_csv(resources, csv_path)
57def process_folder(folder_path, resources, expected_type):
58 if not os.path.exists(folder_path):
59 logging.error(f"La sottocartella '{expected_type}' non esiste nel percorso: {folder_path}")
60 return
62 zip_files = get_zip_files(folder_path)
64 for zip_path in tqdm(zip_files, desc=f"Analizzando i file ZIP in {expected_type}"):
65 try:
66 with zipfile.ZipFile(zip_path, 'r') as zip_ref:
67 for zip_file in zip_ref.namelist():
68 try:
69 with zip_ref.open(zip_file) as json_file:
70 data = json.load(json_file)
71 analyze_json(data, resources, zip_path, zip_file, expected_type)
72 except json.JSONDecodeError:
73 logging.error(f"Errore nel parsing JSON del file {zip_file} in {zip_path}")
74 except Exception as e:
75 logging.error(f"Errore nell'elaborazione del file {zip_file} in {zip_path}: {str(e)}")
76 except zipfile.BadZipFile:
77 logging.error(f"File ZIP corrotto o non valido: {zip_path}")
78 except Exception as e:
79 logging.error(f"Errore nell'apertura del file ZIP {zip_path}: {str(e)}")
81def get_zip_files(folder_path):
82 zip_files = []
83 for root, dirs, files in os.walk(folder_path):
84 for file in files:
85 if file.endswith('.zip') and not file == 'se.zip':
86 zip_files.append(os.path.join(root, file))
87 return zip_files
89def analyze_json(data, resources, zip_path, zip_file, expected_type):
90 for graph in data:
91 for entity in graph.get("@graph", []):
92 try:
93 entity_id = entity["@id"]
94 entity_type = get_entity_type(entity)
96 if entity_type is None:
97 print(f"Tipo non specificato per l'entità {entity_id} nel file {zip_file} all'interno di {zip_path}. Assumendo tipo {expected_type}.")
98 entity_type = expected_type
100 if entity_type == expected_type:
101 identifiers = get_identifiers(entity)
103 if entity_id not in resources:
104 resources[entity_id] = set()
105 resources[entity_id].update(identifiers)
106 except KeyError as e:
107 logging.error(f"Chiave mancante nell'entità {entity.get('@id', 'ID sconosciuto')} "
108 f"nel file {zip_file} all'interno di {zip_path}: {str(e)}")
109 except Exception as e:
110 logging.error(f"Errore nell'analisi dell'entità {entity.get('@id', 'ID sconosciuto')} "
111 f"nel file {zip_file} all'interno di {zip_path}: {str(e)}")
113def get_entity_type(entity):
114 if "http://purl.org/spar/fabio/Expression" in entity.get("@type", []):
115 return 'br'
116 elif "http://xmlns.com/foaf/0.1/Agent" in entity.get("@type", []):
117 return 'ra'
118 return None
120def get_identifiers(entity):
121 identifiers = []
122 for identifier in entity.get("http://purl.org/spar/datacite/hasIdentifier", []):
123 if isinstance(identifier, dict) and "@id" in identifier:
124 identifiers.append(identifier["@id"])
125 return identifiers
127def save_duplicates_to_csv(resources, csv_path):
128 try:
129 with open(csv_path, mode='w', newline='', encoding='utf-8') as csv_file:
130 csv_writer = csv.writer(csv_file)
131 csv_writer.writerow(['surviving_entity', 'merged_entities'])
133 duplicates = find_duplicates(resources)
134 for group in duplicates:
135 surviving_entity = group[0]
136 merged_entities = '; '.join(group[1:])
137 csv_writer.writerow([surviving_entity, merged_entities])
138 except Exception as e:
139 logging.error(f"Errore nel salvataggio del file CSV {csv_path}: {str(e)}")
141def find_duplicates(resources):
142 uf = UnionFind()
144 # First, create sets of identifiers for each entity
145 for entity, identifiers in resources.items():
146 for identifier in identifiers:
147 uf.union(entity, identifier)
149 # Then, group entities by their representative
150 groups = {}
151 for entity in resources:
152 rep = uf.find(entity)
153 if rep not in groups:
154 groups[rep] = []
155 groups[rep].append(entity)
157 # Filter out groups with only one entity
158 return [sorted(group) for group in groups.values() if len(group) > 1]
160def main():
161 parser = argparse.ArgumentParser(
162 description="Trova risorse duplicate in base ai loro ID.",
163 formatter_class=RichHelpFormatter,
164 )
165 parser.add_argument("folder_path", type=str, help="Percorso della cartella contenente le sottocartelle 'br' e 'ra'")
166 parser.add_argument("csv_path", type=str, help="Percorso del file CSV per salvare i duplicati")
167 parser.add_argument("resource_type", type=str, choices=['br', 'ra', 'both'],
168 help="Tipo di risorsa da analizzare: 'br' per risorse bibliografiche, 'ra' per agenti responsabili, 'both' per entrambi")
169 args = parser.parse_args()
171 read_and_analyze_zip_files(args.folder_path, args.csv_path, args.resource_type)
173if __name__ == "__main__":
174 main()