Coverage for oc_meta / run / find / duplicated_entities.py: 0%

132 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1import argparse 

2import csv 

3import json 

4import logging 

5import os 

6import zipfile 

7 

8from rich_argparse import RichHelpFormatter 

9from tqdm import tqdm 

10 

11logging.basicConfig(filename='error_log_find_duplicated_resources.txt', level=logging.ERROR, 

12 format='%(asctime)s - %(levelname)s - %(message)s') 

13 

14class UnionFind: 

15 def __init__(self): 

16 self.parent = {} 

17 self.rank = {} 

18 

19 def find(self, item): 

20 if item not in self.parent: 

21 self.parent[item] = item 

22 self.rank[item] = 0 

23 return item 

24 

25 if self.parent[item] != item: 

26 self.parent[item] = self.find(self.parent[item]) 

27 return self.parent[item] 

28 

29 def union(self, x, y): 

30 xroot = self.find(x) 

31 yroot = self.find(y) 

32 

33 if xroot == yroot: 

34 return 

35 

36 if self.rank[xroot] < self.rank[yroot]: 

37 self.parent[xroot] = yroot 

38 elif self.rank[xroot] > self.rank[yroot]: 

39 self.parent[yroot] = xroot 

40 else: 

41 self.parent[yroot] = xroot 

42 self.rank[xroot] += 1 

43 

44def read_and_analyze_zip_files(folder_path, csv_path, resource_type): 

45 resources = {} 

46 

47 if resource_type in ['br', 'both']: 

48 br_folder_path = os.path.join(folder_path, 'br') 

49 process_folder(br_folder_path, resources, 'br') 

50 

51 if resource_type in ['ra', 'both']: 

52 ra_folder_path = os.path.join(folder_path, 'ra') 

53 process_folder(ra_folder_path, resources, 'ra') 

54 

55 save_duplicates_to_csv(resources, csv_path) 

56 

57def process_folder(folder_path, resources, expected_type): 

58 if not os.path.exists(folder_path): 

59 logging.error(f"La sottocartella '{expected_type}' non esiste nel percorso: {folder_path}") 

60 return 

61 

62 zip_files = get_zip_files(folder_path) 

63 

64 for zip_path in tqdm(zip_files, desc=f"Analizzando i file ZIP in {expected_type}"): 

65 try: 

66 with zipfile.ZipFile(zip_path, 'r') as zip_ref: 

67 for zip_file in zip_ref.namelist(): 

68 try: 

69 with zip_ref.open(zip_file) as json_file: 

70 data = json.load(json_file) 

71 analyze_json(data, resources, zip_path, zip_file, expected_type) 

72 except json.JSONDecodeError: 

73 logging.error(f"Errore nel parsing JSON del file {zip_file} in {zip_path}") 

74 except Exception as e: 

75 logging.error(f"Errore nell'elaborazione del file {zip_file} in {zip_path}: {str(e)}") 

76 except zipfile.BadZipFile: 

77 logging.error(f"File ZIP corrotto o non valido: {zip_path}") 

78 except Exception as e: 

79 logging.error(f"Errore nell'apertura del file ZIP {zip_path}: {str(e)}") 

80 

81def get_zip_files(folder_path): 

82 zip_files = [] 

83 for root, dirs, files in os.walk(folder_path): 

84 for file in files: 

85 if file.endswith('.zip') and not file == 'se.zip': 

86 zip_files.append(os.path.join(root, file)) 

87 return zip_files 

88 

89def analyze_json(data, resources, zip_path, zip_file, expected_type): 

90 for graph in data: 

91 for entity in graph.get("@graph", []): 

92 try: 

93 entity_id = entity["@id"] 

94 entity_type = get_entity_type(entity) 

95 

96 if entity_type is None: 

97 print(f"Tipo non specificato per l'entità {entity_id} nel file {zip_file} all'interno di {zip_path}. Assumendo tipo {expected_type}.") 

98 entity_type = expected_type 

99 

100 if entity_type == expected_type: 

101 identifiers = get_identifiers(entity) 

102 

103 if entity_id not in resources: 

104 resources[entity_id] = set() 

105 resources[entity_id].update(identifiers) 

106 except KeyError as e: 

107 logging.error(f"Chiave mancante nell'entità {entity.get('@id', 'ID sconosciuto')} " 

108 f"nel file {zip_file} all'interno di {zip_path}: {str(e)}") 

109 except Exception as e: 

110 logging.error(f"Errore nell'analisi dell'entità {entity.get('@id', 'ID sconosciuto')} " 

111 f"nel file {zip_file} all'interno di {zip_path}: {str(e)}") 

112 

113def get_entity_type(entity): 

114 if "http://purl.org/spar/fabio/Expression" in entity.get("@type", []): 

115 return 'br' 

116 elif "http://xmlns.com/foaf/0.1/Agent" in entity.get("@type", []): 

117 return 'ra' 

118 return None 

119 

120def get_identifiers(entity): 

121 identifiers = [] 

122 for identifier in entity.get("http://purl.org/spar/datacite/hasIdentifier", []): 

123 if isinstance(identifier, dict) and "@id" in identifier: 

124 identifiers.append(identifier["@id"]) 

125 return identifiers 

126 

127def save_duplicates_to_csv(resources, csv_path): 

128 try: 

129 with open(csv_path, mode='w', newline='', encoding='utf-8') as csv_file: 

130 csv_writer = csv.writer(csv_file) 

131 csv_writer.writerow(['surviving_entity', 'merged_entities']) 

132 

133 duplicates = find_duplicates(resources) 

134 for group in duplicates: 

135 surviving_entity = group[0] 

136 merged_entities = '; '.join(group[1:]) 

137 csv_writer.writerow([surviving_entity, merged_entities]) 

138 except Exception as e: 

139 logging.error(f"Errore nel salvataggio del file CSV {csv_path}: {str(e)}") 

140 

141def find_duplicates(resources): 

142 uf = UnionFind() 

143 

144 # First, create sets of identifiers for each entity 

145 for entity, identifiers in resources.items(): 

146 for identifier in identifiers: 

147 uf.union(entity, identifier) 

148 

149 # Then, group entities by their representative 

150 groups = {} 

151 for entity in resources: 

152 rep = uf.find(entity) 

153 if rep not in groups: 

154 groups[rep] = [] 

155 groups[rep].append(entity) 

156 

157 # Filter out groups with only one entity 

158 return [sorted(group) for group in groups.values() if len(group) > 1] 

159 

160def main(): 

161 parser = argparse.ArgumentParser( 

162 description="Trova risorse duplicate in base ai loro ID.", 

163 formatter_class=RichHelpFormatter, 

164 ) 

165 parser.add_argument("folder_path", type=str, help="Percorso della cartella contenente le sottocartelle 'br' e 'ra'") 

166 parser.add_argument("csv_path", type=str, help="Percorso del file CSV per salvare i duplicati") 

167 parser.add_argument("resource_type", type=str, choices=['br', 'ra', 'both'], 

168 help="Tipo di risorsa da analizzare: 'br' per risorse bibliografiche, 'ra' per agenti responsabili, 'both' per entrambi") 

169 args = parser.parse_args() 

170 

171 read_and_analyze_zip_files(args.folder_path, args.csv_path, args.resource_type) 

172 

173if __name__ == "__main__": 

174 main()