Coverage for oc_meta / run / meta_process.py: 73%

299 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2019 Silvio Peroni <silvio.peroni@unibo.it> 

2# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com> 

3# SPDX-FileCopyrightText: 2021 Simone Persiani <iosonopersia@gmail.com> 

4# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8from __future__ import annotations 

9 

10import bisect 

11import csv 

12import multiprocessing 

13import os 

14import sys 

15import traceback 

16from argparse import ArgumentParser 

17from datetime import datetime 

18from sys import executable, platform 

19from typing import Any, Dict, List, Optional, Tuple 

20 

21import orjson 

22import yaml 

23from oc_ocdm import Storer 

24from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

25from oc_ocdm.prov import ProvSet 

26from oc_ocdm.support.reporter import Reporter 

27from piccione.upload.on_triplestore import upload_sparql_updates 

28from rich_argparse import RichHelpFormatter 

29from time_agnostic_library.support import generate_config_file 

30 

31from oc_meta.core.creator import Creator 

32from oc_meta.core.curator import Curator 

33from oc_meta.lib.console import console, create_progress 

34from oc_meta.lib.file_manager import (get_csv_data, init_cache, normalize_path, 

35 pathoo, sort_files) 

36from oc_meta.lib.timer import ProcessTimer 

37from oc_meta.run.benchmark.plotting import plot_incremental_progress 

38 

39 

40def _upload_to_triplestore(endpoint: str, folder: str, redis_host: str, redis_port: int, redis_db: int, failed_file: str, stop_file: str, description: str = "Processing files") -> None: 

41 """Upload SPARQL queries from folder to triplestore endpoint.""" 

42 try: 

43 upload_sparql_updates( 

44 endpoint=endpoint, 

45 folder=folder, 

46 failed_file=failed_file, 

47 stop_file=stop_file, 

48 redis_host=redis_host, 

49 redis_port=redis_port, 

50 redis_db=redis_db, 

51 description=description, 

52 show_progress=False, 

53 ) 

54 except Exception as e: 

55 console.print(f"[red]Upload to {endpoint} failed: {e}[/red]") 

56 sys.exit(1) 

57 

58 

59def _generate_queries_worker(storer: Storer, triplestore_url: str, base_dir: str) -> None: 

60 storer.upload_all( 

61 triplestore_url=triplestore_url, 

62 base_dir=base_dir, 

63 batch_size=10, 

64 ) 

65 

66 

67def _store_rdf_worker(storer: Storer, base_dir, base_iri): 

68 storer.store_all( 

69 base_dir=base_dir, 

70 base_iri=base_iri, 

71 ) 

72 

73 

74class MetaProcess: 

75 def __init__(self, settings: dict, meta_config_path: str, timer: Optional[ProcessTimer] = None): 

76 self.settings = settings 

77 # Mandatory settings 

78 self.triplestore_url = settings["triplestore_url"] # Main triplestore for data 

79 self.provenance_triplestore_url = settings["provenance_triplestore_url"] # Separate triplestore for provenance 

80 self.input_csv_dir = normalize_path(settings["input_csv_dir"]) 

81 self.base_output_dir = normalize_path(settings["base_output_dir"]) 

82 self.resp_agent = settings["resp_agent"] 

83 self.output_csv_dir = os.path.join(self.base_output_dir, "csv") 

84 self.output_rdf_dir = ( 

85 normalize_path(settings["output_rdf_dir"]) + os.sep + "rdf" + os.sep 

86 ) 

87 self.cache_path = os.path.join(self.base_output_dir, "cache.txt") 

88 self.errors_path = os.path.join(self.base_output_dir, "errors.txt") 

89 self.timer = timer or ProcessTimer(enabled=False) 

90 # Optional settings 

91 self.base_iri = settings["base_iri"] 

92 self.dir_split_number = settings["dir_split_number"] 

93 self.items_per_file = settings["items_per_file"] 

94 self.default_dir = settings["default_dir"] 

95 self.zip_output_rdf = settings["zip_output_rdf"] 

96 self.source = settings["source"] 

97 supplier_prefix: str = settings["supplier_prefix"] 

98 self.supplier_prefix = ( 

99 supplier_prefix if supplier_prefix.endswith("0") else f"{supplier_prefix}0" 

100 ) 

101 self.silencer = settings["silencer"] 

102 self.rdf_files_only = settings.get("rdf_files_only", False) 

103 # Time-Agnostic_library integration 

104 self.time_agnostic_library_config = os.path.join( 

105 os.path.dirname(meta_config_path), "time_agnostic_library_config.json" 

106 ) 

107 if not os.path.exists(self.time_agnostic_library_config): 

108 generate_config_file( 

109 config_path=self.time_agnostic_library_config, 

110 dataset_urls=[self.triplestore_url], 

111 dataset_dirs=list(), 

112 provenance_urls=[self.provenance_triplestore_url] if self.provenance_triplestore_url not in settings["provenance_endpoints"] else settings["provenance_endpoints"], 

113 provenance_dirs=list(), 

114 blazegraph_full_text_search=settings["blazegraph_full_text_search"], 

115 fuseki_full_text_search=settings["fuseki_full_text_search"], 

116 virtuoso_full_text_search=settings["virtuoso_full_text_search"], 

117 graphdb_connector_name=settings["graphdb_connector_name"], 

118 ) 

119 

120 # Redis settings 

121 self.redis_host = settings.get("redis_host", "localhost") 

122 self.redis_port = settings.get("redis_port", 6379) 

123 self.redis_db = settings.get("redis_db", 5) 

124 self.redis_cache_db = settings.get("redis_cache_db", 2) 

125 

126 self.counter_handler = RedisCounterHandler( 

127 host=self.redis_host, port=self.redis_port, db=self.redis_db 

128 ) 

129 

130 # Triplestore upload settings 

131 self.ts_failed_queries = settings.get("ts_failed_queries", "failed_queries.txt") 

132 self.ts_stop_file = settings.get("ts_stop_file", ".stop_upload") 

133 

134 self.data_update_dir = os.path.join(self.base_output_dir, "to_be_uploaded_data") 

135 self.prov_update_dir = os.path.join(self.base_output_dir, "to_be_uploaded_prov") 

136 

137 def prepare_folders(self) -> List[str]: 

138 completed = init_cache(self.cache_path) 

139 files_in_input_csv_dir = { 

140 filename 

141 for filename in os.listdir(self.input_csv_dir) 

142 if filename.endswith(".csv") 

143 } 

144 files_to_be_processed = sort_files( 

145 list(files_in_input_csv_dir.difference(completed)) 

146 ) 

147 pathoo(self.output_csv_dir) 

148 csv.field_size_limit(128) 

149 return files_to_be_processed 

150 

151 def curate_and_create( 

152 self, 

153 filename: str, 

154 cache_path: str, 

155 errors_path: str, 

156 settings: dict | None = None, 

157 meta_config_path: str | None = None, 

158 progress=None, 

159 ) -> Tuple[dict, str, str, str]: 

160 try: 

161 with self.timer.timer("total_processing"): 

162 filepath = os.path.join(self.input_csv_dir, filename) 

163 console.print(filepath) 

164 data = get_csv_data(filepath) 

165 self.timer.record_metric("input_records", len(data)) 

166 

167 min_rows_parallel = settings.get("min_rows_parallel", 1000) if settings else 1000 

168 curator_obj = Curator( 

169 data=data, 

170 ts=self.triplestore_url, 

171 prov_config=self.time_agnostic_library_config, 

172 counter_handler=self.counter_handler, 

173 base_iri=self.base_iri, 

174 prefix=self.supplier_prefix, 

175 settings=settings, 

176 silencer=self.silencer, 

177 meta_config_path=meta_config_path, 

178 timer=self.timer, 

179 progress=progress, 

180 min_rows_parallel=min_rows_parallel, 

181 ) 

182 name = f"{filename.replace('.csv', '')}_{datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}" 

183 curator_obj.curator( 

184 filename=name, path_csv=self.output_csv_dir 

185 ) 

186 self.timer.record_metric("curated_records", len(curator_obj.data)) 

187 

188 local_g_size = len(curator_obj.finder.graph) 

189 self.timer.record_metric("local_g_triples", local_g_size) 

190 preexisting_count = len(curator_obj.preexisting_entities) 

191 self.timer.record_metric("preexisting_entities_count", preexisting_count) 

192 

193 RDF_BATCH_SIZE = 100_000 

194 data = curator_obj.data 

195 n_batches = (len(data) + RDF_BATCH_SIZE - 1) // RDF_BATCH_SIZE 

196 total_entities = 0 

197 total_modified = 0 

198 

199 batch_task_id = None 

200 if progress is not None and n_batches > 1: 

201 batch_task_id = progress.add_task( 

202 f" [cyan]RDF batches[/cyan] ({filename})", 

203 total=n_batches, 

204 ) 

205 

206 for batch_idx in range(n_batches): 

207 batch_start = batch_idx * RDF_BATCH_SIZE 

208 batch_end = min(batch_start + RDF_BATCH_SIZE, len(data)) 

209 batch_data = data[batch_start:batch_end] 

210 

211 with self.timer.timer("rdf_creation"): 

212 creator_obj = Creator( 

213 data=batch_data, 

214 finder=curator_obj.finder, 

215 base_iri=self.base_iri, 

216 counter_handler=self.counter_handler, 

217 supplier_prefix=self.supplier_prefix, 

218 resp_agent=self.resp_agent, 

219 ra_index=curator_obj.index_id_ra, 

220 br_index=curator_obj.index_id_br, 

221 re_index_csv=curator_obj.re_index, 

222 ar_index_csv=curator_obj.ar_index, 

223 vi_index=curator_obj.VolIss, 

224 silencer=self.silencer, 

225 progress=progress, 

226 ) 

227 creator = creator_obj.creator(source=self.source) 

228 total_entities += len(creator.res_to_entity) 

229 

230 prov = ProvSet( 

231 creator, 

232 self.base_iri, 

233 wanted_label=False, 

234 supplier_prefix=self.supplier_prefix, 

235 custom_counter_handler=self.counter_handler, 

236 ) 

237 modified_entities = prov.generate_provenance() 

238 total_modified += len(modified_entities) 

239 

240 repok = Reporter(print_sentences=False) 

241 reperr = Reporter(print_sentences=True, prefix="[Storer: ERROR] ") 

242 res_storer = Storer( 

243 abstract_set=creator, 

244 repok=repok, 

245 reperr=reperr, 

246 dir_split=self.dir_split_number, 

247 n_file_item=self.items_per_file, 

248 default_dir=self.default_dir, 

249 output_format="json-ld", 

250 zip_output=self.zip_output_rdf, 

251 modified_entities=modified_entities, 

252 ) 

253 prov_storer = Storer( 

254 abstract_set=prov, 

255 repok=repok, 

256 reperr=reperr, 

257 dir_split=self.dir_split_number, 

258 n_file_item=self.items_per_file, 

259 output_format="json-ld", 

260 zip_output=self.zip_output_rdf, 

261 modified_entities=modified_entities, 

262 ) 

263 self.store_data_and_prov(res_storer, prov_storer) 

264 del creator_obj, creator, prov, res_storer, prov_storer, modified_entities 

265 

266 if progress is not None and batch_task_id is not None: 

267 progress.update(batch_task_id, advance=1) 

268 

269 if progress is not None and batch_task_id is not None: 

270 progress.remove_task(batch_task_id) 

271 

272 self.timer.record_metric("entities_created", total_entities) 

273 self.timer.record_metric("modified_entities", total_modified) 

274 

275 return {"message": "success"}, cache_path, errors_path, filename 

276 except Exception as e: 

277 tb = traceback.format_exc() 

278 template = ( 

279 "An exception of type {0} occurred. Arguments:\n{1!r}\nTraceback:\n{2}" 

280 ) 

281 message = template.format(type(e).__name__, e.args, tb) 

282 return {"message": message}, cache_path, errors_path, filename 

283 

284 def _setup_output_directories(self) -> None: 

285 """Create output directories for data and provenance.""" 

286 os.makedirs(self.data_update_dir, exist_ok=True) 

287 os.makedirs(self.prov_update_dir, exist_ok=True) 

288 

289 def _upload_sparql_queries(self) -> None: 

290 """Upload SPARQL queries to triplestores in parallel.""" 

291 data_upload_folder = os.path.join(self.data_update_dir, "to_be_uploaded") 

292 prov_upload_folder = os.path.join(self.prov_update_dir, "to_be_uploaded") 

293 

294 # Use forkserver to avoid deadlocks when forking from a multi-threaded process. 

295 # Libraries like Redis and rdflib create background threads, and fork() would 

296 # copy locked mutexes into the child process, causing hangs. 

297 ctx = multiprocessing.get_context('forkserver') 

298 

299 data_process = ctx.Process( 

300 target=_upload_to_triplestore, 

301 args=( 

302 self.triplestore_url, 

303 data_upload_folder, 

304 self.redis_host, 

305 self.redis_port, 

306 self.redis_cache_db, 

307 self.ts_failed_queries, 

308 self.ts_stop_file, 

309 "Uploading data SPARQL" 

310 ) 

311 ) 

312 

313 prov_process = ctx.Process( 

314 target=_upload_to_triplestore, 

315 args=( 

316 self.provenance_triplestore_url, 

317 prov_upload_folder, 

318 self.redis_host, 

319 self.redis_port, 

320 self.redis_cache_db, 

321 self.ts_failed_queries, 

322 self.ts_stop_file, 

323 "Uploading prov SPARQL" 

324 ) 

325 ) 

326 

327 data_process.start() 

328 prov_process.start() 

329 

330 data_process.join() 

331 prov_process.join() 

332 

333 if data_process.exitcode != 0: 

334 raise RuntimeError(f"Data upload failed with exit code {data_process.exitcode}") 

335 if prov_process.exitcode != 0: 

336 raise RuntimeError(f"Provenance upload failed with exit code {prov_process.exitcode}") 

337 

338 def store_data_and_prov( 

339 self, res_storer: Storer, prov_storer: Storer 

340 ) -> None: 

341 """Orchestrate storage and upload.""" 

342 if not self.rdf_files_only: 

343 self._setup_output_directories() 

344 self._store_and_upload(res_storer, prov_storer, self.timer) 

345 

346 def _store_and_upload( 

347 self, res_storer: Storer, prov_storer: Storer, timer: ProcessTimer 

348 ) -> None: 

349 """Store RDF files and upload queries to triplestore with parallel execution.""" 

350 with timer.timer("storage"): 

351 # Use forkserver to avoid deadlocks when forking from a multi-threaded process. 

352 # Libraries like Redis and rdflib create background threads, and fork() would 

353 # copy locked mutexes into the child process, causing hangs. 

354 ctx = multiprocessing.get_context('forkserver') 

355 

356 data_store_process = ctx.Process( 

357 target=_store_rdf_worker, 

358 args=(res_storer, self.output_rdf_dir, self.base_iri) 

359 ) 

360 prov_store_process = ctx.Process( 

361 target=_store_rdf_worker, 

362 args=(prov_storer, self.output_rdf_dir, self.base_iri) 

363 ) 

364 rdf_store_processes = [data_store_process, prov_store_process] 

365 for p in rdf_store_processes: 

366 p.start() 

367 

368 if not self.rdf_files_only: 

369 data_query_process = ctx.Process( 

370 target=_generate_queries_worker, 

371 args=(res_storer, self.triplestore_url, self.data_update_dir) 

372 ) 

373 prov_query_process = ctx.Process( 

374 target=_generate_queries_worker, 

375 args=(prov_storer, self.provenance_triplestore_url, self.prov_update_dir) 

376 ) 

377 data_query_process.start() 

378 prov_query_process.start() 

379 data_query_process.join() 

380 prov_query_process.join() 

381 

382 if data_query_process.exitcode != 0: 

383 raise RuntimeError(f"Data query generation failed with exit code {data_query_process.exitcode}") 

384 if prov_query_process.exitcode != 0: 

385 raise RuntimeError(f"Prov query generation failed with exit code {prov_query_process.exitcode}") 

386 

387 self._upload_sparql_queries() 

388 

389 for p in rdf_store_processes: 

390 p.join() 

391 if p.exitcode != 0: 

392 raise RuntimeError(f"RDF storage failed with exit code {p.exitcode}") 

393 

394 def run_sparql_updates(self, endpoint: str, folder: str): 

395 upload_sparql_updates( 

396 endpoint=endpoint, 

397 folder=folder, 

398 failed_file=self.ts_failed_queries, 

399 stop_file=self.ts_stop_file, 

400 redis_host=self.redis_host, 

401 redis_port=self.redis_port, 

402 redis_db=self.redis_cache_db, 

403 ) 

404 

405 

406def _save_incremental_report(all_reports: List[Dict[str, Any]], meta_config_path: str, output_path: str) -> None: 

407 """Save incremental timing report to JSON file.""" 

408 aggregate_report = { 

409 "timestamp": datetime.now().isoformat(), 

410 "config_path": meta_config_path, 

411 "total_files_processed": len(all_reports), 

412 "files": all_reports, 

413 "aggregate": _compute_aggregate_metrics(all_reports) 

414 } 

415 with open(output_path, 'wb') as f: 

416 f.write(orjson.dumps(aggregate_report, option=orjson.OPT_INDENT_2)) 

417 

418 

419def _get_file_peak_memory(report: Dict[str, Any]) -> float: 

420 """Get peak memory (MB) across all phases in a file report.""" 

421 phases = report["report"]["phases"] 

422 peaks = [p["peak_memory_mb"] for p in phases if p["peak_memory_mb"]] 

423 return max(peaks) if peaks else 0 

424 

425 

426def _compute_aggregate_metrics(all_reports: List[Dict[str, Any]]) -> Dict[str, Any]: 

427 """Compute aggregate statistics across all file reports.""" 

428 if not all_reports: 

429 return {} 

430 

431 total_duration = sum(r["report"]["metrics"].get("total_duration_seconds", 0) for r in all_reports) 

432 total_records = sum(r["report"]["metrics"].get("input_records", 0) for r in all_reports) 

433 total_entities = sum(r["report"]["metrics"].get("entities_created", 0) for r in all_reports) 

434 

435 durations = [r["report"]["metrics"].get("total_duration_seconds", 0) for r in all_reports] 

436 throughputs = [r["report"]["metrics"].get("throughput_records_per_sec", 0) for r in all_reports] 

437 

438 file_peaks = [_get_file_peak_memory(r) for r in all_reports] 

439 non_zero_peaks = [p for p in file_peaks if p] 

440 

441 result: Dict[str, Any] = { 

442 "total_files": len(all_reports), 

443 "total_duration_seconds": round(total_duration, 3), 

444 "total_records_processed": total_records, 

445 "total_entities_created": total_entities, 

446 "average_time_per_file": round(total_duration / len(all_reports), 3) if all_reports else 0, 

447 "average_throughput": round(sum(throughputs) / len(throughputs), 2) if throughputs else 0, 

448 "min_time": round(min(durations), 3) if durations else 0, 

449 "max_time": round(max(durations), 3) if durations else 0, 

450 "overall_throughput": round(total_records / total_duration, 2) if total_duration > 0 else 0, 

451 } 

452 if non_zero_peaks: 

453 result["peak_memory_mb"] = round(max(non_zero_peaks), 1) 

454 result["average_peak_memory_mb"] = round(sum(non_zero_peaks) / len(non_zero_peaks), 1) 

455 return result 

456 

457 

458def _print_aggregate_summary(all_reports: List[Dict[str, Any]]) -> None: 

459 """Print aggregate summary of all processed files.""" 

460 aggregate = _compute_aggregate_metrics(all_reports) 

461 

462 console.print(f"\n{'='*60}") 

463 console.print("[bold]Aggregate Timing Summary[/bold]") 

464 console.print(f"{'='*60}") 

465 console.print(f"Total Files: {aggregate['total_files']}") 

466 console.print(f"Total Duration: {aggregate['total_duration_seconds']}s") 

467 console.print(f"Total Records: {aggregate['total_records_processed']}") 

468 console.print(f"Total Entities: {aggregate['total_entities_created']}") 

469 console.print(f"Average Time/File: {aggregate['average_time_per_file']}s") 

470 console.print(f"Min/Max Time: {aggregate['min_time']}s / {aggregate['max_time']}s") 

471 console.print(f"Overall Throughput: {aggregate['overall_throughput']} rec/s") 

472 if "peak_memory_mb" in aggregate: 

473 console.print(f"Peak Memory (RSS): {aggregate['peak_memory_mb']} MB") 

474 console.print(f"Avg Peak Memory: {aggregate['average_peak_memory_mb']} MB") 

475 console.print(f"{'='*60}\n") 

476 

477 

478def run_meta_process( 

479 settings: dict, meta_config_path: str, enable_timing: bool = False, timing_output: Optional[str] = None 

480) -> None: 

481 is_unix = platform in {"linux", "linux2", "darwin"} 

482 all_reports = [] 

483 

484 meta_process_setup = MetaProcess(settings=settings, meta_config_path=meta_config_path) 

485 files_to_be_processed = meta_process_setup.prepare_folders() 

486 

487 generate_gentle_buttons(meta_process_setup.base_output_dir, meta_config_path, is_unix) 

488 

489 with create_progress() as progress: 

490 task_id = progress.add_task("Processing files", total=len(files_to_be_processed)) 

491 for idx, filename in enumerate(files_to_be_processed, 1): 

492 try: 

493 if os.path.exists(os.path.join(meta_process_setup.base_output_dir, ".stop")): 

494 console.print("\n[yellow]Stop file detected. Halting processing.[/yellow]") 

495 break 

496 

497 if enable_timing: 

498 console.print(f"\n[cyan][{idx}/{len(files_to_be_processed)}][/cyan] Processing {filename}...") 

499 

500 on_phase_cb = None 

501 if enable_timing and timing_output: 

502 _chart = timing_output.replace('.json', '_chart.png') 

503 _reports, _fn, _cfg, _out = all_reports, filename, meta_config_path, timing_output 

504 _include_storage = not settings.get("rdf_files_only", False) 

505 def _on_phase(timer: ProcessTimer) -> None: 

506 snapshot = _reports + [{"filename": _fn, "report": timer.get_report()}] 

507 _save_incremental_report(snapshot, _cfg, _out) 

508 plot_incremental_progress(snapshot, _chart, include_storage=_include_storage) 

509 on_phase_cb = _on_phase 

510 

511 file_timer = ProcessTimer(enabled=enable_timing, verbose=enable_timing, on_phase_complete=on_phase_cb) 

512 meta_process_setup.timer = file_timer 

513 

514 result = meta_process_setup.curate_and_create( 

515 filename, 

516 meta_process_setup.cache_path, 

517 meta_process_setup.errors_path, 

518 settings=settings, 

519 meta_config_path=meta_config_path, 

520 progress=progress, 

521 ) 

522 task_done(result) 

523 

524 if enable_timing: 

525 report = file_timer.get_report() 

526 all_reports.append({ 

527 "filename": filename, 

528 "report": report 

529 }) 

530 file_timer.print_file_summary(filename) 

531 

532 except Exception as e: 

533 traceback_str = traceback.format_exc() 

534 console.print( 

535 f"[red]Error processing file {filename}: {e}\nTraceback:\n{traceback_str}[/red]" 

536 ) 

537 finally: 

538 progress.advance(task_id) 

539 

540 if not os.path.exists(os.path.join(meta_process_setup.base_output_dir, ".stop")): 

541 if os.path.exists(meta_process_setup.cache_path): 

542 os.rename( 

543 meta_process_setup.cache_path, 

544 meta_process_setup.cache_path.replace( 

545 ".txt", f'_{datetime.now().strftime("%Y-%m-%dT%H_%M_%S_%f")}.txt' 

546 ), 

547 ) 

548 if is_unix: 

549 delete_lock_files(base_dir=meta_process_setup.base_output_dir) 

550 

551 if enable_timing and all_reports: 

552 _print_aggregate_summary(all_reports) 

553 if timing_output: 

554 aggregate_report = { 

555 "timestamp": datetime.now().isoformat(), 

556 "config_path": meta_config_path, 

557 "total_files": len(all_reports), 

558 "files": all_reports, 

559 "aggregate": _compute_aggregate_metrics(all_reports) 

560 } 

561 with open(timing_output, 'wb') as f: 

562 f.write(orjson.dumps(aggregate_report, option=orjson.OPT_INDENT_2)) 

563 console.print(f"[green][Timing] Report saved to {timing_output}[/green]") 

564 

565 

566def _cache_sort_key(filename: str) -> int: 

567 return int(filename.replace(".csv", "")) 

568 

569 

570def task_done(task_output: tuple) -> None: 

571 message, cache_path, errors_path, filename = task_output 

572 if message["message"] == "skip": 

573 pass 

574 elif message["message"] == "success": 

575 if not os.path.exists(cache_path): 

576 with open(cache_path, "w", encoding="utf-8") as aux_file: 

577 aux_file.write(filename + "\n") 

578 else: 

579 with open(cache_path, "r", encoding="utf-8") as aux_file: 

580 cache_data = aux_file.read().splitlines() 

581 try: 

582 bisect.insort(cache_data, filename, key=_cache_sort_key) 

583 except ValueError: 

584 # Non-numeric filename (e.g. "data.csv"): append without ordering 

585 cache_data.append(filename) 

586 with open(cache_path, "w", encoding="utf-8") as aux_file: 

587 aux_file.write("\n".join(cache_data)) 

588 else: 

589 with open(errors_path, "a", encoding="utf-8") as aux_file: 

590 aux_file.write(f'{filename}: {message["message"]}' + "\n") 

591 

592 

593def delete_lock_files(base_dir: str) -> None: 

594 for dirpath, _, filenames in os.walk(base_dir): 

595 for filename in filenames: 

596 if filename.endswith(".lock"): 

597 os.remove(os.path.join(dirpath, filename)) 

598 

599 

600def generate_gentle_buttons(dir: str, config: str, is_unix: bool): 

601 if os.path.exists(os.path.join(dir, ".stop")): 

602 os.remove(os.path.join(dir, ".stop")) 

603 ext = "sh" if is_unix else "bat" 

604 with open(f"gently_run.{ext}", "w") as rsh: 

605 rsh.write( 

606 f'{executable} -m oc_meta.lib.stopper -t "{dir}" --remove\n{executable} -m oc_meta.run.meta_process -c {config}' 

607 ) 

608 with open(f"gently_stop.{ext}", "w") as rsh: 

609 rsh.write(f'{executable} -m oc_meta.lib.stopper -t "{dir}" --add') 

610 

611 

612if __name__ == "__main__": # pragma: no cover 

613 arg_parser = ArgumentParser( 

614 "meta_process.py", 

615 description="This script runs the OCMeta data processing workflow", 

616 formatter_class=RichHelpFormatter, 

617 ) 

618 arg_parser.add_argument( 

619 "-c", 

620 "--config", 

621 dest="config", 

622 required=True, 

623 help="Configuration file directory", 

624 ) 

625 arg_parser.add_argument( 

626 "--timing", 

627 action="store_true", 

628 help="Enable timing metrics collection and display summary at the end", 

629 ) 

630 arg_parser.add_argument( 

631 "--timing-output", 

632 dest="timing_output", 

633 default=None, 

634 help="Optional path to save timing report as JSON file", 

635 ) 

636 args = arg_parser.parse_args() 

637 with open(args.config, encoding="utf-8") as file: 

638 settings = yaml.full_load(file) 

639 run_meta_process( 

640 settings=settings, 

641 meta_config_path=args.config, 

642 enable_timing=args.timing, 

643 timing_output=args.timing_output 

644 )