Coverage for oc_meta / run / find / duplicated_entities.py: 0%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import csv 

7import logging 

8import os 

9import zipfile 

10 

11import orjson 

12from rich_argparse import RichHelpFormatter 

13from tqdm import tqdm 

14 

15from oc_meta.lib.file_manager import collect_files 

16 

17logging.basicConfig(filename='error_log_find_duplicated_resources.txt', level=logging.ERROR, 

18 format='%(asctime)s - %(levelname)s - %(message)s') 

19 

20class UnionFind: 

21 def __init__(self): 

22 self.parent = {} 

23 self.rank = {} 

24 

25 def find(self, item): 

26 if item not in self.parent: 

27 self.parent[item] = item 

28 self.rank[item] = 0 

29 return item 

30 

31 if self.parent[item] != item: 

32 self.parent[item] = self.find(self.parent[item]) 

33 return self.parent[item] 

34 

35 def union(self, x, y): 

36 xroot = self.find(x) 

37 yroot = self.find(y) 

38 

39 if xroot == yroot: 

40 return 

41 

42 if self.rank[xroot] < self.rank[yroot]: 

43 self.parent[xroot] = yroot 

44 elif self.rank[xroot] > self.rank[yroot]: 

45 self.parent[yroot] = xroot 

46 else: 

47 self.parent[yroot] = xroot 

48 self.rank[xroot] += 1 

49 

50def read_and_analyze_zip_files(folder_path, csv_path, resource_type): 

51 resources = {} 

52 

53 if resource_type in ['br', 'both']: 

54 br_folder_path = os.path.join(folder_path, 'br') 

55 process_folder(br_folder_path, resources, 'br') 

56 

57 if resource_type in ['ra', 'both']: 

58 ra_folder_path = os.path.join(folder_path, 'ra') 

59 process_folder(ra_folder_path, resources, 'ra') 

60 

61 save_duplicates_to_csv(resources, csv_path) 

62 

63def process_folder(folder_path, resources, expected_type): 

64 if not os.path.exists(folder_path): 

65 logging.error(f"La sottocartella '{expected_type}' non esiste nel percorso: {folder_path}") 

66 return 

67 

68 zip_files = get_zip_files(folder_path) 

69 

70 for zip_path in tqdm(zip_files, desc=f"Analizzando i file ZIP in {expected_type}"): 

71 try: 

72 with zipfile.ZipFile(zip_path, 'r') as zip_ref: 

73 for zip_file in zip_ref.namelist(): 

74 try: 

75 with zip_ref.open(zip_file) as json_file: 

76 data = orjson.loads(json_file.read()) 

77 analyze_json(data, resources, zip_path, zip_file, expected_type) 

78 except orjson.JSONDecodeError: 

79 logging.error(f"Errore nel parsing JSON del file {zip_file} in {zip_path}") 

80 except Exception as e: 

81 logging.error(f"Errore nell'elaborazione del file {zip_file} in {zip_path}: {str(e)}") 

82 except zipfile.BadZipFile: 

83 logging.error(f"File ZIP corrotto o non valido: {zip_path}") 

84 except Exception as e: 

85 logging.error(f"Errore nell'apertura del file ZIP {zip_path}: {str(e)}") 

86 

87def get_zip_files(folder_path: str) -> list[str]: 

88 return sorted(collect_files( 

89 folder_path, 

90 pattern="*.zip", 

91 path_filter=lambda p: os.path.basename(p) != "se.zip", 

92 )) 

93 

94def analyze_json(data, resources, zip_path, zip_file, expected_type): 

95 for graph in data: 

96 for entity in graph.get("@graph", []): 

97 try: 

98 entity_id = entity["@id"] 

99 entity_type = get_entity_type(entity) 

100 

101 if entity_type is None: 

102 print(f"Tipo non specificato per l'entità {entity_id} nel file {zip_file} all'interno di {zip_path}. Assumendo tipo {expected_type}.") 

103 entity_type = expected_type 

104 

105 if entity_type == expected_type: 

106 identifiers = get_identifiers(entity) 

107 

108 if entity_id not in resources: 

109 resources[entity_id] = set() 

110 resources[entity_id].update(identifiers) 

111 except KeyError as e: 

112 logging.error(f"Chiave mancante nell'entità {entity.get('@id', 'ID sconosciuto')} " 

113 f"nel file {zip_file} all'interno di {zip_path}: {str(e)}") 

114 except Exception as e: 

115 logging.error(f"Errore nell'analisi dell'entità {entity.get('@id', 'ID sconosciuto')} " 

116 f"nel file {zip_file} all'interno di {zip_path}: {str(e)}") 

117 

118def get_entity_type(entity): 

119 if "http://purl.org/spar/fabio/Expression" in entity.get("@type", []): 

120 return 'br' 

121 elif "http://xmlns.com/foaf/0.1/Agent" in entity.get("@type", []): 

122 return 'ra' 

123 return None 

124 

125def get_identifiers(entity): 

126 identifiers = [] 

127 for identifier in entity.get("http://purl.org/spar/datacite/hasIdentifier", []): 

128 if isinstance(identifier, dict) and "@id" in identifier: 

129 identifiers.append(identifier["@id"]) 

130 return identifiers 

131 

132def save_duplicates_to_csv(resources, csv_path): 

133 try: 

134 with open(csv_path, mode='w', newline='', encoding='utf-8') as csv_file: 

135 csv_writer = csv.writer(csv_file) 

136 csv_writer.writerow(['surviving_entity', 'merged_entities']) 

137 

138 duplicates = find_duplicates(resources) 

139 for group in duplicates: 

140 surviving_entity = group[0] 

141 merged_entities = '; '.join(group[1:]) 

142 csv_writer.writerow([surviving_entity, merged_entities]) 

143 except Exception as e: 

144 logging.error(f"Errore nel salvataggio del file CSV {csv_path}: {str(e)}") 

145 

146def find_duplicates(resources): 

147 uf = UnionFind() 

148 

149 # First, create sets of identifiers for each entity 

150 for entity, identifiers in resources.items(): 

151 for identifier in identifiers: 

152 uf.union(entity, identifier) 

153 

154 # Then, group entities by their representative 

155 groups = {} 

156 for entity in resources: 

157 rep = uf.find(entity) 

158 if rep not in groups: 

159 groups[rep] = [] 

160 groups[rep].append(entity) 

161 

162 # Filter out groups with only one entity 

163 return [sorted(group) for group in groups.values() if len(group) > 1] 

164 

165def main(): 

166 parser = argparse.ArgumentParser( 

167 description="Trova risorse duplicate in base ai loro ID.", 

168 formatter_class=RichHelpFormatter, 

169 ) 

170 parser.add_argument("folder_path", type=str, help="Percorso della cartella contenente le sottocartelle 'br' e 'ra'") 

171 parser.add_argument("csv_path", type=str, help="Percorso del file CSV per salvare i duplicati") 

172 parser.add_argument("resource_type", type=str, choices=['br', 'ra', 'both'], 

173 help="Tipo di risorsa da analizzare: 'br' per risorse bibliografiche, 'ra' per agenti responsabili, 'both' per entrambi") 

174 args = parser.parse_args() 

175 

176 read_and_analyze_zip_files(args.folder_path, args.csv_path, args.resource_type) 

177 

178if __name__ == "__main__": 

179 main()