Coverage for oc_meta / run / merge / entities.py: 84%

205 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import concurrent.futures 

7import csv 

8import logging 

9import multiprocessing 

10import os 

11import traceback 

12from typing import Dict, List, Set 

13 

14from oc_ocdm.graph import GraphSet 

15from rdflib import URIRef 

16from rich_argparse import RichHelpFormatter 

17from sparqlite import SPARQLClient 

18from tqdm import tqdm 

19 

20from oc_meta.core.editor import MetaEditor 

21 

22logging.basicConfig( 

23 level=logging.INFO, 

24 format="%(asctime)s - %(levelname)s - %(message)s", 

25 datefmt="%Y-%m-%d %H:%M:%S", 

26) 

27logger = logging.getLogger(__name__) 

28 

29 

30class EntityMerger: 

31 def __init__( 

32 self, 

33 meta_config: str, 

34 resp_agent: str, 

35 entity_types: List[str], 

36 stop_file_path: str, 

37 workers: int, 

38 ): 

39 self.meta_config = meta_config 

40 self.resp_agent = resp_agent 

41 self.entity_types = entity_types 

42 self.stop_file_path = stop_file_path 

43 self.workers = workers 

44 self.batch_size = 10 

45 

46 @staticmethod 

47 def get_entity_type(entity_url: str) -> str | None: 

48 parts = entity_url.split("/") 

49 if "oc" in parts and "meta" in parts: 

50 try: 

51 return parts[parts.index("meta") + 1] 

52 except IndexError: 

53 return None 

54 return None 

55 

56 @staticmethod 

57 def read_csv(csv_file: str) -> List[Dict]: 

58 data = [] 

59 with open(csv_file, mode="r", newline="", encoding="utf-8") as file: 

60 csv_reader = csv.DictReader(file) 

61 for row in csv_reader: 

62 if "Done" not in row: 

63 row["Done"] = "False" 

64 data.append(row) 

65 return data 

66 

67 @staticmethod 

68 def write_csv(csv_file: str, data: List[Dict]): 

69 fieldnames = data[0].keys() 

70 with open(csv_file, mode="w", newline="", encoding="utf-8") as file: 

71 writer = csv.DictWriter(file, fieldnames=fieldnames) 

72 writer.writeheader() 

73 for row in data: 

74 writer.writerow(row) 

75 

76 @staticmethod 

77 def count_csv_rows(csv_file: str) -> int: 

78 with open(csv_file, "r", encoding="utf-8") as f: 

79 return sum(1 for _ in f) - 1 

80 

81 def fetch_related_entities_batch( 

82 self, 

83 meta_editor: MetaEditor, 

84 merged_entities: List[str], 

85 surviving_entities: List[str], 

86 batch_size: int = 10, 

87 ) -> Set[URIRef]: 

88 """ 

89 Fetch all related entities in batches and populate the relationship cache. 

90 

91 Args: 

92 meta_editor: MetaEditor instance 

93 merged_entities: List of entities to be merged 

94 surviving_entities: List of surviving entities 

95 batch_size: Maximum number of entities to process in a single SPARQL query 

96 

97 Returns: 

98 Set of all related entities 

99 """ 

100 all_related_entities = set() 

101 

102 with SPARQLClient(meta_editor.endpoint, max_retries=5, backoff_factor=0.3, timeout=3600) as client: 

103 for i in range(0, len(merged_entities), batch_size): 

104 batch_merged = merged_entities[i : i + batch_size] 

105 merged_clauses = [] 

106 for entity in batch_merged: 

107 merged_clauses.extend( 

108 [f"{{?entity ?p <{entity}>}}", f"{{<{entity}> ?p ?entity}}"] 

109 ) 

110 

111 if not merged_clauses: 

112 continue 

113 

114 query = f""" 

115 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

116 PREFIX datacite: <http://purl.org/spar/datacite/> 

117 PREFIX pro: <http://purl.org/spar/pro/> 

118 SELECT DISTINCT ?entity WHERE {{ 

119 {{ 

120 {' UNION '.join(merged_clauses)} 

121 }} 

122 FILTER (?p != rdf:type) 

123 FILTER (?p != datacite:usesIdentifierScheme) 

124 FILTER (?p != pro:withRole) 

125 }} 

126 """ 

127 

128 try: 

129 results = client.query(query) 

130 for result in results["results"]["bindings"]: 

131 if result["entity"]["type"] == "uri": 

132 related_uri = URIRef(result["entity"]["value"]) 

133 all_related_entities.add(related_uri) 

134 

135 for entity in batch_merged: 

136 entity_uri = URIRef(entity) 

137 if entity_uri not in meta_editor.relationship_cache: 

138 meta_editor.relationship_cache[entity_uri] = set() 

139 meta_editor.relationship_cache[entity_uri].add(related_uri) 

140 

141 except Exception as e: 

142 print( 

143 f"Error fetching related entities for merged batch {i}-{i+batch_size}: {e}" 

144 ) 

145 

146 for i in range(0, len(surviving_entities), batch_size): 

147 batch_surviving = surviving_entities[i : i + batch_size] 

148 surviving_clauses = [] 

149 for entity in batch_surviving: 

150 surviving_clauses.append(f"{{<{entity}> ?p ?entity}}") 

151 

152 if not surviving_clauses: 

153 continue 

154 

155 query = f""" 

156 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

157 PREFIX datacite: <http://purl.org/spar/datacite/> 

158 PREFIX pro: <http://purl.org/spar/pro/> 

159 SELECT DISTINCT ?entity WHERE {{ 

160 {{ 

161 {' UNION '.join(surviving_clauses)} 

162 }} 

163 FILTER (?p != rdf:type) 

164 FILTER (?p != datacite:usesIdentifierScheme) 

165 FILTER (?p != pro:withRole) 

166 }} 

167 """ 

168 

169 try: 

170 results = client.query(query) 

171 for result in results["results"]["bindings"]: 

172 if result["entity"]["type"] == "uri": 

173 related_uri = URIRef(result["entity"]["value"]) 

174 all_related_entities.add(related_uri) 

175 

176 for entity in batch_surviving: 

177 entity_uri = URIRef(entity) 

178 if entity_uri not in meta_editor.relationship_cache: 

179 meta_editor.relationship_cache[entity_uri] = set() 

180 meta_editor.relationship_cache[entity_uri].add(related_uri) 

181 

182 except Exception as e: 

183 print( 

184 f"Error fetching related entities for surviving batch {i}-{i+batch_size}: {e}" 

185 ) 

186 

187 return all_related_entities 

188 

189 def should_stop_processing(self) -> bool: 

190 return os.path.exists(self.stop_file_path) 

191 

192 def process_file(self, csv_file: str) -> str: 

193 """Process a single CSV file with cross-row batch processing""" 

194 logger.info(f"Starting to process file: {csv_file}") 

195 data = self.read_csv(csv_file) 

196 logger.info(f"Read {len(data)} rows from {csv_file}") 

197 meta_editor = MetaEditor(self.meta_config, self.resp_agent, save_queries=True) 

198 modified = False 

199 

200 if self.should_stop_processing(): 

201 logger.info("Stop file detected, halting processing") 

202 return csv_file 

203 

204 g_set = GraphSet( 

205 meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler 

206 ) 

207 

208 batch_merged_entities = [] 

209 batch_surviving_entities = [] 

210 rows_to_process = [] 

211 

212 for row in data: 

213 if row.get("Done") == "True": 

214 continue 

215 

216 entity_type = self.get_entity_type(row["surviving_entity"]) 

217 if entity_type in self.entity_types: 

218 surviving_entity = row["surviving_entity"] 

219 merged_entities = row["merged_entities"].split("; ") 

220 batch_surviving_entities.append(surviving_entity) 

221 batch_merged_entities.extend(merged_entities) 

222 rows_to_process.append((surviving_entity, merged_entities)) 

223 

224 if not rows_to_process: 

225 logger.info(f"No rows to process in {csv_file}") 

226 return csv_file 

227 

228 logger.info(f"Found {len(rows_to_process)} rows to process in {csv_file}") 

229 logger.info( 

230 f"Fetching related entities for {len(batch_merged_entities)} merged entities and {len(batch_surviving_entities)} surviving entities" 

231 ) 

232 

233 all_related_entities = self.fetch_related_entities_batch( 

234 meta_editor, 

235 batch_merged_entities, 

236 batch_surviving_entities, 

237 self.batch_size, 

238 ) 

239 logger.info(f"Found {len(all_related_entities)} related entities") 

240 

241 entities_to_import = all_related_entities.copy() 

242 entities_to_import.update(URIRef(e) for e in batch_surviving_entities) 

243 entities_to_import.update(URIRef(e) for e in batch_merged_entities) 

244 

245 entities_to_import = { 

246 e for e in entities_to_import if not meta_editor.entity_cache.is_cached(e) 

247 } 

248 

249 if entities_to_import: 

250 logger.info(f"Importing {len(entities_to_import)} new entities") 

251 try: 

252 meta_editor.reader.import_entities_from_triplestore( 

253 g_set=g_set, 

254 ts_url=meta_editor.endpoint, 

255 entities=list(entities_to_import), 

256 resp_agent=meta_editor.resp_agent, 

257 enable_validation=False, 

258 batch_size=self.batch_size, 

259 ) 

260 

261 for entity in entities_to_import: 

262 meta_editor.entity_cache.add(entity) 

263 logger.info("Entity import completed successfully") 

264 

265 except ValueError as e: 

266 logger.error(f"Error importing entities: {e}") 

267 modified = True 

268 

269 processed_count = 0 

270 for surviving_entity, merged_entities in rows_to_process: 

271 logger.info(f"Processing row - surviving entity: {surviving_entity}") 

272 for merged_entity in merged_entities: 

273 logger.info( 

274 f" Attempting to merge {merged_entity} into {surviving_entity}" 

275 ) 

276 try: 

277 meta_editor.merge(g_set, surviving_entity, merged_entity) 

278 modified = True 

279 processed_count += 1 

280 logger.info(f" Successfully merged {merged_entity}") 

281 except ValueError as e: 

282 logger.error( 

283 f"Error merging {merged_entity} into {surviving_entity}: {e}" 

284 ) 

285 continue 

286 logger.info( 

287 f"Completed processing row with surviving entity: {surviving_entity}" 

288 ) 

289 

290 logger.info(f"Successfully processed {processed_count} merges") 

291 

292 if modified: 

293 marked_done = 0 

294 for row in data: 

295 if ( 

296 row.get("Done") != "True" 

297 and self.get_entity_type(row["surviving_entity"]) 

298 in self.entity_types 

299 ): 

300 row["Done"] = "True" 

301 marked_done += 1 

302 

303 logger.info(f"Marked {marked_done} rows as done") 

304 meta_editor.save(g_set) 

305 self.write_csv(csv_file, data) 

306 logger.info(f"Saved changes to {csv_file}") 

307 

308 return csv_file 

309 

310 def process_folder(self, csv_folder: str): 

311 if os.path.exists(self.stop_file_path): 

312 os.remove(self.stop_file_path) 

313 

314 csv_files = [ 

315 os.path.join(csv_folder, file) 

316 for file in os.listdir(csv_folder) 

317 if file.endswith(".csv") 

318 ] 

319 

320 if self.workers > 4: 

321 csv_files = [ 

322 file for file in csv_files if self.count_csv_rows(file) <= 10000 

323 ] 

324 

325 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment 

326 with concurrent.futures.ProcessPoolExecutor( 

327 max_workers=self.workers, 

328 mp_context=multiprocessing.get_context('forkserver') 

329 ) as executor: 

330 futures = {} 

331 for csv_file in csv_files: 

332 if self.should_stop_processing(): 

333 break 

334 futures[executor.submit(self.process_file, csv_file)] = csv_file 

335 

336 for future in tqdm( 

337 concurrent.futures.as_completed(futures), 

338 total=len(futures), 

339 desc="Overall Progress", 

340 ): 

341 csv_file = futures[future] 

342 try: 

343 future.result() 

344 except Exception as e: 

345 error_trace = traceback.format_exc() 

346 print( 

347 f""" 

348 Error processing file {csv_file}: 

349 Type: {type(e).__name__} 

350 Details: {str(e)} 

351 Full Traceback: 

352 {error_trace} 

353 Suggestion: This is an unexpected error. Please check the traceback for more details. 

354 """ 

355 ) 

356 

357 

358def main(): 

359 parser = argparse.ArgumentParser( 

360 description="Merge entities from CSV files in a folder.", 

361 formatter_class=RichHelpFormatter, 

362 ) 

363 parser.add_argument( 

364 "csv_folder", type=str, help="Path to the folder containing CSV files" 

365 ) 

366 parser.add_argument("meta_config", type=str, help="Meta configuration string") 

367 parser.add_argument("resp_agent", type=str, help="Responsible agent string") 

368 parser.add_argument( 

369 "--entity_types", 

370 nargs="+", 

371 default=["ra", "br", "id"], 

372 help="Types of entities to merge (ra, br, id)", 

373 ) 

374 parser.add_argument( 

375 "--stop_file", type=str, default="stop.out", help="Path to the stop file" 

376 ) 

377 parser.add_argument( 

378 "--workers", type=int, default=4, help="Number of parallel workers" 

379 ) 

380 

381 args = parser.parse_args() 

382 

383 merger = EntityMerger( 

384 meta_config=args.meta_config, 

385 resp_agent=args.resp_agent, 

386 entity_types=args.entity_types, 

387 stop_file_path=args.stop_file, 

388 workers=args.workers, 

389 ) 

390 

391 merger.process_folder(args.csv_folder) 

392 

393 

394if __name__ == "__main__": 

395 main()