Coverage for oc_meta / run / merge / group_entities.py: 79%

169 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1import argparse 

2import csv 

3import os 

4import re 

5 

6import pandas as pd 

7import yaml 

8from rich_argparse import RichHelpFormatter 

9from sparqlite import SPARQLClient 

10from tqdm import tqdm 

11 

12 

13class UnionFind: 

14 def __init__(self): 

15 self.parent = {} 

16 

17 def find(self, item): 

18 if item not in self.parent: 

19 self.parent[item] = item 

20 return item 

21 

22 path = [] 

23 current = item 

24 visited = set() 

25 

26 while current != self.parent[current]: 

27 if current in visited: 

28 raise ValueError(f"Cycle detected in union-find structure at {current}") 

29 visited.add(current) 

30 path.append(current) 

31 current = self.parent[current] 

32 

33 for node in path: 

34 self.parent[node] = current 

35 

36 return current 

37 

38 def union(self, item1, item2): 

39 root1 = self.find(item1) 

40 root2 = self.find(item2) 

41 if root1 != root2: 

42 self.parent[root2] = root1 

43 

44 

45def load_csv(file_path): 

46 df = pd.read_csv(file_path) 

47 required_columns = ["surviving_entity", "merged_entities"] 

48 missing_columns = [col for col in required_columns if col not in df.columns] 

49 if missing_columns: 

50 raise ValueError(f"CSV file missing required columns: {missing_columns}") 

51 return df 

52 

53 

54def query_sparql_batch(endpoint, uris, batch_size=10): 

55 """ 

56 Query SPARQL for related entities in batches. 

57 

58 Args: 

59 endpoint: SPARQL endpoint URL 

60 uris: List of URIs to query 

61 batch_size: Number of URIs to process in a single query 

62 

63 Returns: 

64 Set of all related entities 

65 """ 

66 related_entities = set() 

67 

68 with SPARQLClient(endpoint, max_retries=5, backoff_factor=5, timeout=3600) as client: 

69 for i in range(0, len(uris), batch_size): 

70 batch_uris = uris[i:i + batch_size] 

71 

72 subject_clauses = [] 

73 object_clauses = [] 

74 

75 for uri in batch_uris: 

76 subject_clauses.append(f"{{?entity ?p <{uri}>}}") 

77 object_clauses.append(f"{{<{uri}> ?p ?entity}}") 

78 

79 query = f""" 

80 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

81 PREFIX datacite: <http://purl.org/spar/datacite/> 

82 PREFIX pro: <http://purl.org/spar/pro/> 

83 SELECT DISTINCT ?entity WHERE {{ 

84 {{ 

85 {' UNION '.join(subject_clauses + object_clauses)} 

86 }} 

87 ?entity ?p2 ?o2 . 

88 FILTER (?p != rdf:type) 

89 FILTER (?p != datacite:usesIdentifierScheme) 

90 FILTER (?p != pro:withRole) 

91 }} 

92 """ 

93 

94 results = client.query(query) 

95 

96 for result in results['results']['bindings']: 

97 if result['entity']['type'] == 'uri': 

98 related_entities.add(result['entity']['value']) 

99 

100 return related_entities 

101 

102 

103def get_all_related_entities(endpoint, uris, batch_size=10): 

104 """ 

105 Get all related entities for a list of URIs using batch queries. 

106 

107 Args: 

108 endpoint: SPARQL endpoint URL 

109 uris: List of URIs to query 

110 batch_size: Number of URIs to process in a single query 

111 

112 Returns: 

113 Set of all related entities including input URIs 

114 """ 

115 related_entities = set(uris) 

116 batch_results = query_sparql_batch(endpoint, uris, batch_size) 

117 related_entities.update(batch_results) 

118 return related_entities 

119 

120 

121def get_file_path(uri, dir_split, items_per_file, zip_output=True): 

122 """ 

123 Calculate RDF file path for an entity URI (same logic as MetaEditor.find_file). 

124 

125 Args: 

126 uri: Entity URI (e.g., https://w3id.org/oc/meta/br/060100) 

127 dir_split: Directory split number 

128 items_per_file: Items per file 

129 zip_output: Whether files are zipped (default: True) 

130 

131 Returns: 

132 File path (e.g., br/060/10000/1000.zip) or None if invalid URI 

133 """ 

134 entity_regex = r"^.+/([a-z][a-z])/(0[1-9]+0)([1-9][0-9]*)$" 

135 entity_match = re.match(entity_regex, uri) 

136 

137 if not entity_match: 

138 return None 

139 

140 short_name = entity_match.group(1) 

141 supplier_prefix = entity_match.group(2) 

142 cur_number = int(entity_match.group(3)) 

143 

144 cur_file_split = 0 

145 while True: 

146 if cur_number > cur_file_split: 

147 cur_file_split += items_per_file 

148 else: 

149 break 

150 

151 cur_split = 0 

152 while True: 

153 if cur_number > cur_split: 

154 cur_split += dir_split 

155 else: 

156 break 

157 

158 extension = ".zip" if zip_output else ".json" 

159 return f"{short_name}/{supplier_prefix}/{cur_split}/{cur_file_split}{extension}" 

160 

161 

162def group_entities(df, endpoint, dir_split=10000, items_per_file=1000, zip_output=True): 

163 """ 

164 Group entities based on RDF connections and file range conflicts. 

165 

166 Args: 

167 df: DataFrame with columns 'surviving_entity' and 'merged_entities' 

168 endpoint: SPARQL endpoint URL 

169 dir_split: Directory split number (default: 1000) 

170 items_per_file: Items per file (default: 1000) 

171 zip_output: Whether files are zipped (default: True) 

172 

173 Returns: 

174 Dict of group_id -> DataFrame with grouped rows 

175 """ 

176 uf = UnionFind() 

177 rows_list = [] 

178 

179 for _, row in tqdm(df.iterrows(), total=df.shape[0], desc="Processing rows"): 

180 surviving_entity = row['surviving_entity'] 

181 merged_entities = row['merged_entities'].split("; ") 

182 

183 all_entities = [surviving_entity] + merged_entities 

184 

185 # Union for RDF connections 

186 all_related_entities = get_all_related_entities(endpoint, all_entities) 

187 for entity in all_related_entities: 

188 uf.union(surviving_entity, entity) 

189 

190 # Union for file range conflicts (only for IDs being merged, not related entities) 

191 for entity in all_entities: 

192 entity_file = get_file_path(entity, dir_split, items_per_file, zip_output) 

193 if entity_file: 

194 # Use file path as virtual entity in union-find 

195 uf.union(surviving_entity, f"FILE:{entity_file}") 

196 

197 rows_list.append(row) 

198 

199 grouped_data = {} 

200 for row in rows_list: 

201 surviving_entity = row['surviving_entity'] 

202 group_id = uf.find(surviving_entity) 

203 

204 if group_id not in grouped_data: 

205 grouped_data[group_id] = [] 

206 

207 grouped_data[group_id].append(row) 

208 

209 for group_id in grouped_data: 

210 grouped_data[group_id] = pd.DataFrame(grouped_data[group_id]) 

211 

212 return grouped_data 

213 

214 

215def optimize_groups(grouped_data, target_size=50): 

216 """ 

217 Ottimizza i gruppi combinando quelli singoli mantenendo separate le entità interconnesse. 

218  

219 Args: 

220 grouped_data (dict): Dizionario di DataFrame raggruppati 

221 target_size (int): Dimensione minima target per ogni gruppo 

222  

223 Returns: 

224 dict: Dizionario ottimizzato dei DataFrame raggruppati 

225 """ 

226 # Separa i gruppi in singoli e multipli 

227 single_groups = {k: v for k, v in grouped_data.items() if len(v) == 1} 

228 multi_groups = {k: v for k, v in grouped_data.items() if len(v) > 1} 

229 

230 # Se non ci sono gruppi singoli, restituisci i gruppi originali 

231 if not single_groups: 

232 return grouped_data 

233 

234 # Crea nuovi gruppi combinando quelli singoli 

235 combined_groups = {} 

236 single_items = list(single_groups.items()) 

237 

238 # Combina i gruppi singoli in gruppi della dimensione target 

239 current_group = [] 

240 current_key = None 

241 

242 for key, df in single_items: 

243 if len(current_group) == 0: 

244 current_key = key 

245 

246 current_group.append(df) 

247 

248 if len(current_group) >= target_size: 

249 combined_groups[current_key] = pd.concat(current_group, ignore_index=True) 

250 current_group = [] 

251 

252 # Gestisci eventuali gruppi rimanenti 

253 if current_group: 

254 if len(current_group) == 1 and multi_groups: 

255 # Se è rimasto un gruppo singolo e ci sono gruppi multipli, 

256 # aggiungiamo il gruppo singolo al gruppo multiplo più piccolo 

257 smallest_multi = min(multi_groups.items(), key=lambda x: len(x[1])) 

258 multi_groups[smallest_multi[0]] = pd.concat( 

259 [smallest_multi[1]] + current_group, 

260 ignore_index=True 

261 ) 

262 else: 

263 # Altrimenti lo manteniamo come gruppo separato 

264 combined_groups[current_key] = pd.concat(current_group, ignore_index=True) 

265 

266 # Unisci i gruppi multipli originali con i nuovi gruppi combinati 

267 return {**multi_groups, **combined_groups} 

268 

269 

270def save_grouped_entities(grouped_data, output_dir): 

271 if not os.path.exists(output_dir): 

272 os.makedirs(output_dir) 

273 

274 for key, df in grouped_data.items(): 

275 output_file = os.path.join(output_dir, f"{key.split('/')[-1]}.csv") 

276 print(f"Saving group with {len(df)} rows to {output_file}") 

277 

278 try: 

279 df.to_csv(output_file, index=False) 

280 except AttributeError as e: 

281 print(f"Error saving file {output_file}: {str(e)}") 

282 try: 

283 df.to_csv(output_file, index=False, encoding='utf-8', quoting=csv.QUOTE_NONNUMERIC) 

284 print(f"Successfully saved using alternative method: {output_file}") 

285 except Exception as alt_e: 

286 print(f"Alternative method also failed: {str(alt_e)}") 

287 except Exception as e: 

288 print(f"Unexpected error saving file {output_file}: {str(e)}") 

289 

290 

291def main(): 

292 parser = argparse.ArgumentParser( 

293 description='Process CSV and group entities based on SPARQL queries.', 

294 formatter_class=RichHelpFormatter, 

295 ) 

296 parser.add_argument('csv_file_path', type=str, help='Path to the input CSV file') 

297 parser.add_argument('output_dir', type=str, help='Directory to save the output files') 

298 parser.add_argument('meta_config', type=str, help='Path to meta configuration YAML file') 

299 parser.add_argument('--min_group_size', type=int, default=50, 

300 help='Minimum target size for groups (default: 50)') 

301 

302 args = parser.parse_args() 

303 

304 with open(args.meta_config, 'r', encoding='utf-8') as f: 

305 config = yaml.safe_load(f) 

306 

307 sparql_endpoint = config['triplestore_url'] 

308 dir_split = config['dir_split_number'] 

309 items_per_file = config['items_per_file'] 

310 zip_output = config['zip_output_rdf'] 

311 

312 df = load_csv(args.csv_file_path) 

313 print(f"Loaded CSV file with {len(df)} rows") 

314 print(f"Configuration: dir_split={dir_split}, items_per_file={items_per_file}, zip_output={zip_output}") 

315 

316 grouped_entities = group_entities(df, sparql_endpoint, dir_split, items_per_file, zip_output) 

317 print(f"Initially grouped entities into {len(grouped_entities)} groups") 

318 

319 optimized_groups = optimize_groups(grouped_entities, args.min_group_size) 

320 print(f"Optimized into {len(optimized_groups)} groups") 

321 

322 save_grouped_entities(optimized_groups, args.output_dir) 

323 print("Finished saving grouped entities") 

324 

325 

326if __name__ == "__main__": 

327 main()