Coverage for oc_meta / run / merge / entities.py: 84%

205 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1import argparse 

2import concurrent.futures 

3import csv 

4import logging 

5import os 

6import traceback 

7from typing import Dict, List, Set 

8 

9from oc_ocdm.graph import GraphSet 

10from rdflib import URIRef 

11from rich_argparse import RichHelpFormatter 

12from sparqlite import SPARQLClient 

13from tqdm import tqdm 

14 

15from oc_meta.core.editor import MetaEditor 

16 

17logging.basicConfig( 

18 level=logging.INFO, 

19 format="%(asctime)s - %(levelname)s - %(message)s", 

20 datefmt="%Y-%m-%d %H:%M:%S", 

21) 

22logger = logging.getLogger(__name__) 

23 

24 

25class EntityMerger: 

26 def __init__( 

27 self, 

28 meta_config: str, 

29 resp_agent: str, 

30 entity_types: List[str], 

31 stop_file_path: str, 

32 workers: int, 

33 ): 

34 self.meta_config = meta_config 

35 self.resp_agent = resp_agent 

36 self.entity_types = entity_types 

37 self.stop_file_path = stop_file_path 

38 self.workers = workers 

39 self.batch_size = 10 

40 

41 @staticmethod 

42 def get_entity_type(entity_url: str) -> str | None: 

43 parts = entity_url.split("/") 

44 if "oc" in parts and "meta" in parts: 

45 try: 

46 return parts[parts.index("meta") + 1] 

47 except IndexError: 

48 return None 

49 return None 

50 

51 @staticmethod 

52 def read_csv(csv_file: str) -> List[Dict]: 

53 data = [] 

54 with open(csv_file, mode="r", newline="", encoding="utf-8") as file: 

55 csv_reader = csv.DictReader(file) 

56 for row in csv_reader: 

57 if "Done" not in row: 

58 row["Done"] = "False" 

59 data.append(row) 

60 return data 

61 

62 @staticmethod 

63 def write_csv(csv_file: str, data: List[Dict]): 

64 fieldnames = data[0].keys() 

65 with open(csv_file, mode="w", newline="", encoding="utf-8") as file: 

66 writer = csv.DictWriter(file, fieldnames=fieldnames) 

67 writer.writeheader() 

68 for row in data: 

69 writer.writerow(row) 

70 

71 @staticmethod 

72 def count_csv_rows(csv_file: str) -> int: 

73 with open(csv_file, "r", encoding="utf-8") as f: 

74 return sum(1 for _ in f) - 1 

75 

76 def fetch_related_entities_batch( 

77 self, 

78 meta_editor: MetaEditor, 

79 merged_entities: List[str], 

80 surviving_entities: List[str], 

81 batch_size: int = 10, 

82 ) -> Set[URIRef]: 

83 """ 

84 Fetch all related entities in batches and populate the relationship cache. 

85 

86 Args: 

87 meta_editor: MetaEditor instance 

88 merged_entities: List of entities to be merged 

89 surviving_entities: List of surviving entities 

90 batch_size: Maximum number of entities to process in a single SPARQL query 

91 

92 Returns: 

93 Set of all related entities 

94 """ 

95 all_related_entities = set() 

96 

97 with SPARQLClient(meta_editor.endpoint, max_retries=5, backoff_factor=0.3, timeout=3600) as client: 

98 for i in range(0, len(merged_entities), batch_size): 

99 batch_merged = merged_entities[i : i + batch_size] 

100 merged_clauses = [] 

101 for entity in batch_merged: 

102 merged_clauses.extend( 

103 [f"{{?entity ?p <{entity}>}}", f"{{<{entity}> ?p ?entity}}"] 

104 ) 

105 

106 if not merged_clauses: 

107 continue 

108 

109 query = f""" 

110 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

111 PREFIX datacite: <http://purl.org/spar/datacite/> 

112 PREFIX pro: <http://purl.org/spar/pro/> 

113 SELECT DISTINCT ?entity WHERE {{ 

114 {{ 

115 {' UNION '.join(merged_clauses)} 

116 }} 

117 FILTER (?p != rdf:type) 

118 FILTER (?p != datacite:usesIdentifierScheme) 

119 FILTER (?p != pro:withRole) 

120 }} 

121 """ 

122 

123 try: 

124 results = client.query(query) 

125 for result in results["results"]["bindings"]: 

126 if result["entity"]["type"] == "uri": 

127 related_uri = URIRef(result["entity"]["value"]) 

128 all_related_entities.add(related_uri) 

129 

130 for entity in batch_merged: 

131 entity_uri = URIRef(entity) 

132 if entity_uri not in meta_editor.relationship_cache: 

133 meta_editor.relationship_cache[entity_uri] = set() 

134 meta_editor.relationship_cache[entity_uri].add(related_uri) 

135 

136 except Exception as e: 

137 print( 

138 f"Error fetching related entities for merged batch {i}-{i+batch_size}: {e}" 

139 ) 

140 

141 for i in range(0, len(surviving_entities), batch_size): 

142 batch_surviving = surviving_entities[i : i + batch_size] 

143 surviving_clauses = [] 

144 for entity in batch_surviving: 

145 surviving_clauses.append(f"{{<{entity}> ?p ?entity}}") 

146 

147 if not surviving_clauses: 

148 continue 

149 

150 query = f""" 

151 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

152 PREFIX datacite: <http://purl.org/spar/datacite/> 

153 PREFIX pro: <http://purl.org/spar/pro/> 

154 SELECT DISTINCT ?entity WHERE {{ 

155 {{ 

156 {' UNION '.join(surviving_clauses)} 

157 }} 

158 FILTER (?p != rdf:type) 

159 FILTER (?p != datacite:usesIdentifierScheme) 

160 FILTER (?p != pro:withRole) 

161 }} 

162 """ 

163 

164 try: 

165 results = client.query(query) 

166 for result in results["results"]["bindings"]: 

167 if result["entity"]["type"] == "uri": 

168 related_uri = URIRef(result["entity"]["value"]) 

169 all_related_entities.add(related_uri) 

170 

171 for entity in batch_surviving: 

172 entity_uri = URIRef(entity) 

173 if entity_uri not in meta_editor.relationship_cache: 

174 meta_editor.relationship_cache[entity_uri] = set() 

175 meta_editor.relationship_cache[entity_uri].add(related_uri) 

176 

177 except Exception as e: 

178 print( 

179 f"Error fetching related entities for surviving batch {i}-{i+batch_size}: {e}" 

180 ) 

181 

182 return all_related_entities 

183 

184 def should_stop_processing(self) -> bool: 

185 return os.path.exists(self.stop_file_path) 

186 

187 def process_file(self, csv_file: str) -> str: 

188 """Process a single CSV file with cross-row batch processing""" 

189 logger.info(f"Starting to process file: {csv_file}") 

190 data = self.read_csv(csv_file) 

191 logger.info(f"Read {len(data)} rows from {csv_file}") 

192 meta_editor = MetaEditor(self.meta_config, self.resp_agent, save_queries=True) 

193 modified = False 

194 

195 if self.should_stop_processing(): 

196 logger.info("Stop file detected, halting processing") 

197 return csv_file 

198 

199 g_set = GraphSet( 

200 meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler 

201 ) 

202 

203 batch_merged_entities = [] 

204 batch_surviving_entities = [] 

205 rows_to_process = [] 

206 

207 for row in data: 

208 if row.get("Done") == "True": 

209 continue 

210 

211 entity_type = self.get_entity_type(row["surviving_entity"]) 

212 if entity_type in self.entity_types: 

213 surviving_entity = row["surviving_entity"] 

214 merged_entities = row["merged_entities"].split("; ") 

215 batch_surviving_entities.append(surviving_entity) 

216 batch_merged_entities.extend(merged_entities) 

217 rows_to_process.append((surviving_entity, merged_entities)) 

218 

219 if not rows_to_process: 

220 logger.info(f"No rows to process in {csv_file}") 

221 return csv_file 

222 

223 logger.info(f"Found {len(rows_to_process)} rows to process in {csv_file}") 

224 logger.info( 

225 f"Fetching related entities for {len(batch_merged_entities)} merged entities and {len(batch_surviving_entities)} surviving entities" 

226 ) 

227 

228 all_related_entities = self.fetch_related_entities_batch( 

229 meta_editor, 

230 batch_merged_entities, 

231 batch_surviving_entities, 

232 self.batch_size, 

233 ) 

234 logger.info(f"Found {len(all_related_entities)} related entities") 

235 

236 entities_to_import = all_related_entities.copy() 

237 entities_to_import.update(URIRef(e) for e in batch_surviving_entities) 

238 entities_to_import.update(URIRef(e) for e in batch_merged_entities) 

239 

240 entities_to_import = { 

241 e for e in entities_to_import if not meta_editor.entity_cache.is_cached(e) 

242 } 

243 

244 if entities_to_import: 

245 logger.info(f"Importing {len(entities_to_import)} new entities") 

246 try: 

247 meta_editor.reader.import_entities_from_triplestore( 

248 g_set=g_set, 

249 ts_url=meta_editor.endpoint, 

250 entities=list(entities_to_import), 

251 resp_agent=meta_editor.resp_agent, 

252 enable_validation=False, 

253 batch_size=self.batch_size, 

254 ) 

255 

256 for entity in entities_to_import: 

257 meta_editor.entity_cache.add(entity) 

258 logger.info("Entity import completed successfully") 

259 

260 except ValueError as e: 

261 logger.error(f"Error importing entities: {e}") 

262 modified = True 

263 

264 processed_count = 0 

265 for surviving_entity, merged_entities in rows_to_process: 

266 logger.info(f"Processing row - surviving entity: {surviving_entity}") 

267 surviving_uri = URIRef(surviving_entity) 

268 for merged_entity in merged_entities: 

269 logger.info( 

270 f" Attempting to merge {merged_entity} into {surviving_entity}" 

271 ) 

272 try: 

273 meta_editor.merge(g_set, surviving_uri, URIRef(merged_entity)) 

274 modified = True 

275 processed_count += 1 

276 logger.info(f" Successfully merged {merged_entity}") 

277 except ValueError as e: 

278 logger.error( 

279 f"Error merging {merged_entity} into {surviving_entity}: {e}" 

280 ) 

281 continue 

282 logger.info( 

283 f"Completed processing row with surviving entity: {surviving_entity}" 

284 ) 

285 

286 logger.info(f"Successfully processed {processed_count} merges") 

287 

288 if modified: 

289 marked_done = 0 

290 for row in data: 

291 if ( 

292 row.get("Done") != "True" 

293 and self.get_entity_type(row["surviving_entity"]) 

294 in self.entity_types 

295 ): 

296 row["Done"] = "True" 

297 marked_done += 1 

298 

299 logger.info(f"Marked {marked_done} rows as done") 

300 meta_editor.save(g_set) 

301 self.write_csv(csv_file, data) 

302 logger.info(f"Saved changes to {csv_file}") 

303 

304 return csv_file 

305 

306 def process_folder(self, csv_folder: str): 

307 if os.path.exists(self.stop_file_path): 

308 os.remove(self.stop_file_path) 

309 

310 csv_files = [ 

311 os.path.join(csv_folder, file) 

312 for file in os.listdir(csv_folder) 

313 if file.endswith(".csv") 

314 ] 

315 

316 if self.workers > 4: 

317 csv_files = [ 

318 file for file in csv_files if self.count_csv_rows(file) <= 10000 

319 ] 

320 

321 with concurrent.futures.ProcessPoolExecutor( 

322 max_workers=self.workers 

323 ) as executor: 

324 futures = {} 

325 for csv_file in csv_files: 

326 if self.should_stop_processing(): 

327 break 

328 futures[executor.submit(self.process_file, csv_file)] = csv_file 

329 

330 for future in tqdm( 

331 concurrent.futures.as_completed(futures), 

332 total=len(futures), 

333 desc="Overall Progress", 

334 ): 

335 csv_file = futures[future] 

336 try: 

337 future.result() 

338 except Exception as e: 

339 error_trace = traceback.format_exc() 

340 print( 

341 f""" 

342 Error processing file {csv_file}: 

343 Type: {type(e).__name__} 

344 Details: {str(e)} 

345 Full Traceback: 

346 {error_trace} 

347 Suggestion: This is an unexpected error. Please check the traceback for more details. 

348 """ 

349 ) 

350 

351 

352def main(): 

353 parser = argparse.ArgumentParser( 

354 description="Merge entities from CSV files in a folder.", 

355 formatter_class=RichHelpFormatter, 

356 ) 

357 parser.add_argument( 

358 "csv_folder", type=str, help="Path to the folder containing CSV files" 

359 ) 

360 parser.add_argument("meta_config", type=str, help="Meta configuration string") 

361 parser.add_argument("resp_agent", type=str, help="Responsible agent string") 

362 parser.add_argument( 

363 "--entity_types", 

364 nargs="+", 

365 default=["ra", "br", "id"], 

366 help="Types of entities to merge (ra, br, id)", 

367 ) 

368 parser.add_argument( 

369 "--stop_file", type=str, default="stop.out", help="Path to the stop file" 

370 ) 

371 parser.add_argument( 

372 "--workers", type=int, default=4, help="Number of parallel workers" 

373 ) 

374 

375 args = parser.parse_args() 

376 

377 merger = EntityMerger( 

378 meta_config=args.meta_config, 

379 resp_agent=args.resp_agent, 

380 entity_types=args.entity_types, 

381 stop_file_path=args.stop_file, 

382 workers=args.workers, 

383 ) 

384 

385 merger.process_folder(args.csv_folder) 

386 

387 

388if __name__ == "__main__": 

389 main()