Coverage for oc_meta / run / merge / group_entities.py: 79%

169 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import csv 

7import os 

8import re 

9 

10import pandas as pd 

11import yaml 

12from rich_argparse import RichHelpFormatter 

13from sparqlite import SPARQLClient 

14from tqdm import tqdm 

15 

16 

17class UnionFind: 

18 def __init__(self): 

19 self.parent = {} 

20 

21 def find(self, item): 

22 if item not in self.parent: 

23 self.parent[item] = item 

24 return item 

25 

26 path = [] 

27 current = item 

28 visited = set() 

29 

30 while current != self.parent[current]: 

31 if current in visited: 

32 raise ValueError(f"Cycle detected in union-find structure at {current}") 

33 visited.add(current) 

34 path.append(current) 

35 current = self.parent[current] 

36 

37 for node in path: 

38 self.parent[node] = current 

39 

40 return current 

41 

42 def union(self, item1, item2): 

43 root1 = self.find(item1) 

44 root2 = self.find(item2) 

45 if root1 != root2: 

46 self.parent[root2] = root1 

47 

48 

49def load_csv(file_path): 

50 df = pd.read_csv(file_path) 

51 required_columns = ["surviving_entity", "merged_entities"] 

52 missing_columns = [col for col in required_columns if col not in df.columns] 

53 if missing_columns: 

54 raise ValueError(f"CSV file missing required columns: {missing_columns}") 

55 return df 

56 

57 

58def query_sparql_batch(endpoint, uris, batch_size=10): 

59 """ 

60 Query SPARQL for related entities in batches. 

61 

62 Args: 

63 endpoint: SPARQL endpoint URL 

64 uris: List of URIs to query 

65 batch_size: Number of URIs to process in a single query 

66 

67 Returns: 

68 Set of all related entities 

69 """ 

70 related_entities = set() 

71 

72 with SPARQLClient(endpoint, max_retries=5, backoff_factor=5, timeout=3600) as client: 

73 for i in range(0, len(uris), batch_size): 

74 batch_uris = uris[i:i + batch_size] 

75 

76 subject_clauses = [] 

77 object_clauses = [] 

78 

79 for uri in batch_uris: 

80 subject_clauses.append(f"{{?entity ?p <{uri}>}}") 

81 object_clauses.append(f"{{<{uri}> ?p ?entity}}") 

82 

83 query = f""" 

84 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

85 PREFIX datacite: <http://purl.org/spar/datacite/> 

86 PREFIX pro: <http://purl.org/spar/pro/> 

87 SELECT DISTINCT ?entity WHERE {{ 

88 {{ 

89 {' UNION '.join(subject_clauses + object_clauses)} 

90 }} 

91 ?entity ?p2 ?o2 . 

92 FILTER (?p != rdf:type) 

93 FILTER (?p != datacite:usesIdentifierScheme) 

94 FILTER (?p != pro:withRole) 

95 }} 

96 """ 

97 

98 results = client.query(query) 

99 

100 for result in results['results']['bindings']: 

101 if result['entity']['type'] == 'uri': 

102 related_entities.add(result['entity']['value']) 

103 

104 return related_entities 

105 

106 

107def get_all_related_entities(endpoint, uris, batch_size=10): 

108 """ 

109 Get all related entities for a list of URIs using batch queries. 

110 

111 Args: 

112 endpoint: SPARQL endpoint URL 

113 uris: List of URIs to query 

114 batch_size: Number of URIs to process in a single query 

115 

116 Returns: 

117 Set of all related entities including input URIs 

118 """ 

119 related_entities = set(uris) 

120 batch_results = query_sparql_batch(endpoint, uris, batch_size) 

121 related_entities.update(batch_results) 

122 return related_entities 

123 

124 

125def get_file_path(uri, dir_split, items_per_file, zip_output=True): 

126 """ 

127 Calculate RDF file path for an entity URI (same logic as MetaEditor.find_file). 

128 

129 Args: 

130 uri: Entity URI (e.g., https://w3id.org/oc/meta/br/060100) 

131 dir_split: Directory split number 

132 items_per_file: Items per file 

133 zip_output: Whether files are zipped (default: True) 

134 

135 Returns: 

136 File path (e.g., br/060/10000/1000.zip) or None if invalid URI 

137 """ 

138 entity_regex = r"^.+/([a-z][a-z])/(0[1-9]+0)([1-9][0-9]*)$" 

139 entity_match = re.match(entity_regex, uri) 

140 

141 if not entity_match: 

142 return None 

143 

144 short_name = entity_match.group(1) 

145 supplier_prefix = entity_match.group(2) 

146 cur_number = int(entity_match.group(3)) 

147 

148 cur_file_split = 0 

149 while True: 

150 if cur_number > cur_file_split: 

151 cur_file_split += items_per_file 

152 else: 

153 break 

154 

155 cur_split = 0 

156 while True: 

157 if cur_number > cur_split: 

158 cur_split += dir_split 

159 else: 

160 break 

161 

162 extension = ".zip" if zip_output else ".json" 

163 return f"{short_name}/{supplier_prefix}/{cur_split}/{cur_file_split}{extension}" 

164 

165 

166def group_entities(df, endpoint, dir_split=10000, items_per_file=1000, zip_output=True): 

167 """ 

168 Group entities based on RDF connections and file range conflicts. 

169 

170 Args: 

171 df: DataFrame with columns 'surviving_entity' and 'merged_entities' 

172 endpoint: SPARQL endpoint URL 

173 dir_split: Directory split number (default: 1000) 

174 items_per_file: Items per file (default: 1000) 

175 zip_output: Whether files are zipped (default: True) 

176 

177 Returns: 

178 Dict of group_id -> DataFrame with grouped rows 

179 """ 

180 uf = UnionFind() 

181 rows_list = [] 

182 

183 for _, row in tqdm(df.iterrows(), total=df.shape[0], desc="Processing rows"): 

184 surviving_entity = row['surviving_entity'] 

185 merged_entities = row['merged_entities'].split("; ") 

186 

187 all_entities = [surviving_entity] + merged_entities 

188 

189 # Union for RDF connections 

190 all_related_entities = get_all_related_entities(endpoint, all_entities) 

191 for entity in all_related_entities: 

192 uf.union(surviving_entity, entity) 

193 

194 # Union for file range conflicts (only for IDs being merged, not related entities) 

195 for entity in all_entities: 

196 entity_file = get_file_path(entity, dir_split, items_per_file, zip_output) 

197 if entity_file: 

198 # Use file path as virtual entity in union-find 

199 uf.union(surviving_entity, f"FILE:{entity_file}") 

200 

201 rows_list.append(row) 

202 

203 grouped_data = {} 

204 for row in rows_list: 

205 surviving_entity = row['surviving_entity'] 

206 group_id = uf.find(surviving_entity) 

207 

208 if group_id not in grouped_data: 

209 grouped_data[group_id] = [] 

210 

211 grouped_data[group_id].append(row) 

212 

213 for group_id in grouped_data: 

214 grouped_data[group_id] = pd.DataFrame(grouped_data[group_id]) 

215 

216 return grouped_data 

217 

218 

219def optimize_groups(grouped_data, target_size=50): 

220 """ 

221 Ottimizza i gruppi combinando quelli singoli mantenendo separate le entità interconnesse. 

222  

223 Args: 

224 grouped_data (dict): Dizionario di DataFrame raggruppati 

225 target_size (int): Dimensione minima target per ogni gruppo 

226  

227 Returns: 

228 dict: Dizionario ottimizzato dei DataFrame raggruppati 

229 """ 

230 # Separa i gruppi in singoli e multipli 

231 single_groups = {k: v for k, v in grouped_data.items() if len(v) == 1} 

232 multi_groups = {k: v for k, v in grouped_data.items() if len(v) > 1} 

233 

234 # Se non ci sono gruppi singoli, restituisci i gruppi originali 

235 if not single_groups: 

236 return grouped_data 

237 

238 # Crea nuovi gruppi combinando quelli singoli 

239 combined_groups = {} 

240 single_items = list(single_groups.items()) 

241 

242 # Combina i gruppi singoli in gruppi della dimensione target 

243 current_group = [] 

244 current_key = None 

245 

246 for key, df in single_items: 

247 if len(current_group) == 0: 

248 current_key = key 

249 

250 current_group.append(df) 

251 

252 if len(current_group) >= target_size: 

253 combined_groups[current_key] = pd.concat(current_group, ignore_index=True) 

254 current_group = [] 

255 

256 # Gestisci eventuali gruppi rimanenti 

257 if current_group: 

258 if len(current_group) == 1 and multi_groups: 

259 # Se è rimasto un gruppo singolo e ci sono gruppi multipli, 

260 # aggiungiamo il gruppo singolo al gruppo multiplo più piccolo 

261 smallest_multi = min(multi_groups.items(), key=lambda x: len(x[1])) 

262 multi_groups[smallest_multi[0]] = pd.concat( 

263 [smallest_multi[1]] + current_group, 

264 ignore_index=True 

265 ) 

266 else: 

267 # Altrimenti lo manteniamo come gruppo separato 

268 combined_groups[current_key] = pd.concat(current_group, ignore_index=True) 

269 

270 # Unisci i gruppi multipli originali con i nuovi gruppi combinati 

271 return {**multi_groups, **combined_groups} 

272 

273 

274def save_grouped_entities(grouped_data, output_dir): 

275 if not os.path.exists(output_dir): 

276 os.makedirs(output_dir) 

277 

278 for key, df in grouped_data.items(): 

279 output_file = os.path.join(output_dir, f"{key.split('/')[-1]}.csv") 

280 print(f"Saving group with {len(df)} rows to {output_file}") 

281 

282 try: 

283 df.to_csv(output_file, index=False) 

284 except AttributeError as e: 

285 print(f"Error saving file {output_file}: {str(e)}") 

286 try: 

287 df.to_csv(output_file, index=False, encoding='utf-8', quoting=csv.QUOTE_NONNUMERIC) 

288 print(f"Successfully saved using alternative method: {output_file}") 

289 except Exception as alt_e: 

290 print(f"Alternative method also failed: {str(alt_e)}") 

291 except Exception as e: 

292 print(f"Unexpected error saving file {output_file}: {str(e)}") 

293 

294 

295def main(): 

296 parser = argparse.ArgumentParser( 

297 description='Process CSV and group entities based on SPARQL queries.', 

298 formatter_class=RichHelpFormatter, 

299 ) 

300 parser.add_argument('csv_file_path', type=str, help='Path to the input CSV file') 

301 parser.add_argument('output_dir', type=str, help='Directory to save the output files') 

302 parser.add_argument('meta_config', type=str, help='Path to meta configuration YAML file') 

303 parser.add_argument('--min_group_size', type=int, default=50, 

304 help='Minimum target size for groups (default: 50)') 

305 

306 args = parser.parse_args() 

307 

308 with open(args.meta_config, 'r', encoding='utf-8') as f: 

309 config = yaml.safe_load(f) 

310 

311 sparql_endpoint = config['triplestore_url'] 

312 dir_split = config['dir_split_number'] 

313 items_per_file = config['items_per_file'] 

314 zip_output = config['zip_output_rdf'] 

315 

316 df = load_csv(args.csv_file_path) 

317 print(f"Loaded CSV file with {len(df)} rows") 

318 print(f"Configuration: dir_split={dir_split}, items_per_file={items_per_file}, zip_output={zip_output}") 

319 

320 grouped_entities = group_entities(df, sparql_endpoint, dir_split, items_per_file, zip_output) 

321 print(f"Initially grouped entities into {len(grouped_entities)} groups") 

322 

323 optimized_groups = optimize_groups(grouped_entities, args.min_group_size) 

324 print(f"Optimized into {len(optimized_groups)} groups") 

325 

326 save_grouped_entities(optimized_groups, args.output_dir) 

327 print("Finished saving grouped entities") 

328 

329 

330if __name__ == "__main__": 

331 main()