Coverage for oc_meta / run / find / duplicated_entities.py: 0%
128 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import csv
7import logging
8import os
9import zipfile
11import orjson
12from rich_argparse import RichHelpFormatter
13from tqdm import tqdm
15from oc_meta.lib.file_manager import collect_files
17logging.basicConfig(filename='error_log_find_duplicated_resources.txt', level=logging.ERROR,
18 format='%(asctime)s - %(levelname)s - %(message)s')
20class UnionFind:
21 def __init__(self):
22 self.parent = {}
23 self.rank = {}
25 def find(self, item):
26 if item not in self.parent:
27 self.parent[item] = item
28 self.rank[item] = 0
29 return item
31 if self.parent[item] != item:
32 self.parent[item] = self.find(self.parent[item])
33 return self.parent[item]
35 def union(self, x, y):
36 xroot = self.find(x)
37 yroot = self.find(y)
39 if xroot == yroot:
40 return
42 if self.rank[xroot] < self.rank[yroot]:
43 self.parent[xroot] = yroot
44 elif self.rank[xroot] > self.rank[yroot]:
45 self.parent[yroot] = xroot
46 else:
47 self.parent[yroot] = xroot
48 self.rank[xroot] += 1
50def read_and_analyze_zip_files(folder_path, csv_path, resource_type):
51 resources = {}
53 if resource_type in ['br', 'both']:
54 br_folder_path = os.path.join(folder_path, 'br')
55 process_folder(br_folder_path, resources, 'br')
57 if resource_type in ['ra', 'both']:
58 ra_folder_path = os.path.join(folder_path, 'ra')
59 process_folder(ra_folder_path, resources, 'ra')
61 save_duplicates_to_csv(resources, csv_path)
63def process_folder(folder_path, resources, expected_type):
64 if not os.path.exists(folder_path):
65 logging.error(f"La sottocartella '{expected_type}' non esiste nel percorso: {folder_path}")
66 return
68 zip_files = get_zip_files(folder_path)
70 for zip_path in tqdm(zip_files, desc=f"Analizzando i file ZIP in {expected_type}"):
71 try:
72 with zipfile.ZipFile(zip_path, 'r') as zip_ref:
73 for zip_file in zip_ref.namelist():
74 try:
75 with zip_ref.open(zip_file) as json_file:
76 data = orjson.loads(json_file.read())
77 analyze_json(data, resources, zip_path, zip_file, expected_type)
78 except orjson.JSONDecodeError:
79 logging.error(f"Errore nel parsing JSON del file {zip_file} in {zip_path}")
80 except Exception as e:
81 logging.error(f"Errore nell'elaborazione del file {zip_file} in {zip_path}: {str(e)}")
82 except zipfile.BadZipFile:
83 logging.error(f"File ZIP corrotto o non valido: {zip_path}")
84 except Exception as e:
85 logging.error(f"Errore nell'apertura del file ZIP {zip_path}: {str(e)}")
87def get_zip_files(folder_path: str) -> list[str]:
88 return sorted(collect_files(
89 folder_path,
90 pattern="*.zip",
91 path_filter=lambda p: os.path.basename(p) != "se.zip",
92 ))
94def analyze_json(data, resources, zip_path, zip_file, expected_type):
95 for graph in data:
96 for entity in graph.get("@graph", []):
97 try:
98 entity_id = entity["@id"]
99 entity_type = get_entity_type(entity)
101 if entity_type is None:
102 print(f"Tipo non specificato per l'entità {entity_id} nel file {zip_file} all'interno di {zip_path}. Assumendo tipo {expected_type}.")
103 entity_type = expected_type
105 if entity_type == expected_type:
106 identifiers = get_identifiers(entity)
108 if entity_id not in resources:
109 resources[entity_id] = set()
110 resources[entity_id].update(identifiers)
111 except KeyError as e:
112 logging.error(f"Chiave mancante nell'entità {entity.get('@id', 'ID sconosciuto')} "
113 f"nel file {zip_file} all'interno di {zip_path}: {str(e)}")
114 except Exception as e:
115 logging.error(f"Errore nell'analisi dell'entità {entity.get('@id', 'ID sconosciuto')} "
116 f"nel file {zip_file} all'interno di {zip_path}: {str(e)}")
118def get_entity_type(entity):
119 if "http://purl.org/spar/fabio/Expression" in entity.get("@type", []):
120 return 'br'
121 elif "http://xmlns.com/foaf/0.1/Agent" in entity.get("@type", []):
122 return 'ra'
123 return None
125def get_identifiers(entity):
126 identifiers = []
127 for identifier in entity.get("http://purl.org/spar/datacite/hasIdentifier", []):
128 if isinstance(identifier, dict) and "@id" in identifier:
129 identifiers.append(identifier["@id"])
130 return identifiers
132def save_duplicates_to_csv(resources, csv_path):
133 try:
134 with open(csv_path, mode='w', newline='', encoding='utf-8') as csv_file:
135 csv_writer = csv.writer(csv_file)
136 csv_writer.writerow(['surviving_entity', 'merged_entities'])
138 duplicates = find_duplicates(resources)
139 for group in duplicates:
140 surviving_entity = group[0]
141 merged_entities = '; '.join(group[1:])
142 csv_writer.writerow([surviving_entity, merged_entities])
143 except Exception as e:
144 logging.error(f"Errore nel salvataggio del file CSV {csv_path}: {str(e)}")
146def find_duplicates(resources):
147 uf = UnionFind()
149 # First, create sets of identifiers for each entity
150 for entity, identifiers in resources.items():
151 for identifier in identifiers:
152 uf.union(entity, identifier)
154 # Then, group entities by their representative
155 groups = {}
156 for entity in resources:
157 rep = uf.find(entity)
158 if rep not in groups:
159 groups[rep] = []
160 groups[rep].append(entity)
162 # Filter out groups with only one entity
163 return [sorted(group) for group in groups.values() if len(group) > 1]
165def main():
166 parser = argparse.ArgumentParser(
167 description="Trova risorse duplicate in base ai loro ID.",
168 formatter_class=RichHelpFormatter,
169 )
170 parser.add_argument("folder_path", type=str, help="Percorso della cartella contenente le sottocartelle 'br' e 'ra'")
171 parser.add_argument("csv_path", type=str, help="Percorso del file CSV per salvare i duplicati")
172 parser.add_argument("resource_type", type=str, choices=['br', 'ra', 'both'],
173 help="Tipo di risorsa da analizzare: 'br' per risorse bibliografiche, 'ra' per agenti responsabili, 'both' per entrambi")
174 args = parser.parse_args()
176 read_and_analyze_zip_files(args.folder_path, args.csv_path, args.resource_type)
178if __name__ == "__main__":
179 main()