Coverage for oc_meta / run / meta_process.py: 77%

305 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright 2019 Silvio Peroni <essepuntato@gmail.com> 

4# Copyright 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com> 

5# Copyright 2021 Simone Persiani <iosonopersia@gmail.com> 

6# Copyright 2021-2022 Arcangelo Massari <arcangelo.massari@unibo.it> 

7# 

8# Permission to use, copy, modify, and/or distribute this software for any purpose 

9# with or without fee is hereby granted, provided that the above copyright notice 

10# and this permission notice appear in all copies. 

11# 

12# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

13# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

14# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

15# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

16# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

17# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

18# SOFTWARE. 

19 

20from __future__ import annotations 

21 

22import csv 

23import glob 

24import json 

25import multiprocessing 

26import os 

27import sys 

28import traceback 

29from argparse import ArgumentParser 

30from datetime import datetime 

31from sys import executable, platform 

32from typing import Any, Dict, Iterator, List, Optional, Tuple 

33 

34import redis 

35import yaml 

36from oc_ocdm import Storer 

37from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

38from oc_ocdm.prov import ProvSet 

39from oc_ocdm.support.reporter import Reporter 

40from piccione.upload.on_triplestore import upload_sparql_updates 

41from time_agnostic_library.support import generate_config_file 

42from tqdm import tqdm 

43from virtuoso_utilities.bulk_load import bulk_load 

44 

45from oc_meta.core.creator import Creator 

46from oc_meta.core.curator import Curator 

47from oc_meta.lib.file_manager import (get_csv_data, init_cache, normalize_path, 

48 pathoo, sort_files) 

49from oc_meta.lib.timer import ProcessTimer 

50from oc_meta.run.benchmark.plotting import plot_incremental_progress 

51 

52 

53class BulkLoadError(Exception): 

54 """Raised when Virtuoso bulk load fails.""" 

55 pass 

56 

57 

58def _upload_to_triplestore(endpoint: str, folder: str, redis_host: str, redis_port: int, redis_db: int, failed_file: str, stop_file: str, description: str = "Processing files") -> None: 

59 """Upload SPARQL queries from folder to triplestore endpoint.""" 

60 try: 

61 upload_sparql_updates( 

62 endpoint=endpoint, 

63 folder=folder, 

64 failed_file=failed_file, 

65 stop_file=stop_file, 

66 redis_host=redis_host, 

67 redis_port=redis_port, 

68 redis_db=redis_db, 

69 description=description, 

70 show_progress=False, 

71 ) 

72 except Exception as e: 

73 print(f"Upload to {endpoint} failed: {e}", file=sys.stderr) 

74 sys.exit(1) 

75 

76 

77def _run_bulk_load_process(args: tuple) -> None: 

78 """ 

79 Worker function to execute Virtuoso bulk load in a separate process. 

80 """ 

81 container_name, bulk_load_dir, nquads_host_dir = args 

82 

83 nquads_files = glob.glob(os.path.join(nquads_host_dir, "*.nq.gz")) 

84 if not nquads_files: 

85 return 

86 

87 virtuoso_password = "dba" 

88 

89 try: 

90 bulk_load( 

91 data_directory=nquads_host_dir, 

92 password=virtuoso_password, 

93 docker_container=container_name, 

94 container_data_directory=bulk_load_dir, 

95 log_level="CRITICAL" 

96 ) 

97 except RuntimeError as e: 

98 raise BulkLoadError(f"Bulk load failed for container {container_name}: {e}") from e 

99 

100 

101def _generate_queries_worker(storer: Storer, triplestore_url, base_dir, save_queries, prepare_bulk_load, bulk_load_dir): 

102 storer.upload_all( 

103 triplestore_url=triplestore_url, 

104 base_dir=base_dir, 

105 batch_size=10, 

106 save_queries=save_queries, 

107 prepare_bulk_load=prepare_bulk_load, 

108 bulk_load_dir=bulk_load_dir 

109 ) 

110 

111 

112def _store_rdf_worker(storer: Storer, base_dir, base_iri, context_path): 

113 storer.store_all( 

114 base_dir=base_dir, 

115 base_iri=base_iri, 

116 context_path=context_path 

117 ) 

118 

119 

120class MetaProcess: 

121 def __init__(self, settings: dict, meta_config_path: str, timer: Optional[ProcessTimer] = None): 

122 self.settings = settings 

123 # Mandatory settings 

124 self.triplestore_url = settings["triplestore_url"] # Main triplestore for data 

125 self.provenance_triplestore_url = settings["provenance_triplestore_url"] # Separate triplestore for provenance 

126 self.input_csv_dir = normalize_path(settings["input_csv_dir"]) 

127 self.base_output_dir = normalize_path(settings["base_output_dir"]) 

128 self.resp_agent = settings["resp_agent"] 

129 self.info_dir = os.path.join(self.base_output_dir, "info_dir") 

130 self.output_csv_dir = os.path.join(self.base_output_dir, "csv") 

131 self.output_rdf_dir = ( 

132 normalize_path(settings["output_rdf_dir"]) + os.sep + "rdf" + os.sep 

133 ) 

134 self.cache_path = os.path.join(self.base_output_dir, "cache.txt") 

135 self.errors_path = os.path.join(self.base_output_dir, "errors.txt") 

136 self.timer = timer or ProcessTimer(enabled=False) 

137 # Optional settings 

138 self.base_iri = settings["base_iri"] 

139 self.normalize_titles = settings.get("normalize_titles", True) 

140 self.context_path = settings["context_path"] 

141 self.dir_split_number = settings["dir_split_number"] 

142 self.items_per_file = settings["items_per_file"] 

143 self.default_dir = settings["default_dir"] 

144 self.zip_output_rdf = settings["zip_output_rdf"] 

145 self.source = settings["source"] 

146 self.valid_dois_cache = ( 

147 dict() if bool(settings["use_doi_api_service"]) == True else None 

148 ) 

149 supplier_prefix: str = settings["supplier_prefix"] 

150 self.supplier_prefix = ( 

151 supplier_prefix if supplier_prefix.endswith("0") else f"{supplier_prefix}0" 

152 ) 

153 self.silencer = settings["silencer"] 

154 self.generate_rdf_files = settings.get("generate_rdf_files", True) 

155 # Time-Agnostic_library integration 

156 self.time_agnostic_library_config = os.path.join( 

157 os.path.dirname(meta_config_path), "time_agnostic_library_config.json" 

158 ) 

159 if not os.path.exists(self.time_agnostic_library_config): 

160 generate_config_file( 

161 config_path=self.time_agnostic_library_config, 

162 dataset_urls=[self.triplestore_url], 

163 dataset_dirs=list(), 

164 provenance_urls=[self.provenance_triplestore_url] if self.provenance_triplestore_url not in settings["provenance_endpoints"] else settings["provenance_endpoints"], 

165 provenance_dirs=list(), 

166 blazegraph_full_text_search=settings["blazegraph_full_text_search"], 

167 fuseki_full_text_search=settings["fuseki_full_text_search"], 

168 virtuoso_full_text_search=settings["virtuoso_full_text_search"], 

169 graphdb_connector_name=settings["graphdb_connector_name"], 

170 ) 

171 

172 # Redis settings 

173 self.redis_host = settings.get("redis_host", "localhost") 

174 self.redis_port = settings.get("redis_port", 6379) 

175 self.redis_db = settings.get("redis_db", 5) 

176 self.redis_cache_db = settings.get("redis_cache_db", 2) 

177 self.redis_client = redis.Redis( 

178 host=self.redis_host, port=self.redis_port, db=self.redis_db 

179 ) 

180 

181 self.counter_handler = RedisCounterHandler( 

182 host=self.redis_host, port=self.redis_port, db=self.redis_db 

183 ) 

184 

185 # Triplestore upload settings 

186 self.ts_failed_queries = settings.get("ts_failed_queries", "failed_queries.txt") 

187 self.ts_stop_file = settings.get("ts_stop_file", ".stop_upload") 

188 

189 self.data_update_dir = os.path.join(self.base_output_dir, "to_be_uploaded_data") 

190 self.prov_update_dir = os.path.join(self.base_output_dir, "to_be_uploaded_prov") 

191 

192 def prepare_folders(self) -> List[str]: 

193 completed = init_cache(self.cache_path) 

194 files_in_input_csv_dir = { 

195 filename 

196 for filename in os.listdir(self.input_csv_dir) 

197 if filename.endswith(".csv") 

198 } 

199 files_to_be_processed = sort_files( 

200 list(files_in_input_csv_dir.difference(completed)) 

201 ) 

202 pathoo(self.output_csv_dir) 

203 csv.field_size_limit(128) 

204 return files_to_be_processed 

205 

206 def curate_and_create( 

207 self, 

208 filename: str, 

209 cache_path: str, 

210 errors_path: str, 

211 settings: dict | None = None, 

212 meta_config_path: str | None = None, 

213 ) -> Tuple[dict, str, str, str]: 

214 try: 

215 with self.timer.timer("total_processing"): 

216 filepath = os.path.join(self.input_csv_dir, filename) 

217 print(filepath) 

218 data = get_csv_data(filepath) 

219 self.timer.record_metric("input_records", len(data)) 

220 

221 self.info_dir = os.path.join(self.info_dir, self.supplier_prefix) 

222 curator_obj = Curator( 

223 data=data, 

224 ts=self.triplestore_url, 

225 prov_config=self.time_agnostic_library_config, 

226 counter_handler=self.counter_handler, 

227 base_iri=self.base_iri, 

228 prefix=self.supplier_prefix, 

229 valid_dois_cache=self.valid_dois_cache, 

230 settings=settings, 

231 silencer=self.silencer, 

232 meta_config_path=meta_config_path, 

233 timer=self.timer, 

234 ) 

235 name = f"{filename.replace('.csv', '')}_{datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}" 

236 curator_obj.curator( 

237 filename=name, path_csv=self.output_csv_dir 

238 ) 

239 self.timer.record_metric("curated_records", len(curator_obj.data)) 

240 

241 with self.timer.timer("rdf_creation"): 

242 local_g_size = len(curator_obj.everything_everywhere_allatonce) 

243 self.timer.record_metric("local_g_triples", local_g_size) 

244 preexisting_count = len(curator_obj.preexisting_entities) 

245 self.timer.record_metric("preexisting_entities_count", preexisting_count) 

246 

247 with self.timer.timer("creator_execution"): 

248 creator_obj = Creator( 

249 data=curator_obj.data, 

250 finder=curator_obj.finder, 

251 base_iri=self.base_iri, 

252 counter_handler=self.counter_handler, 

253 supplier_prefix=self.supplier_prefix, 

254 resp_agent=self.resp_agent, 

255 ra_index=curator_obj.index_id_ra, 

256 br_index=curator_obj.index_id_br, 

257 re_index_csv=curator_obj.re_index, 

258 ar_index_csv=curator_obj.ar_index, 

259 vi_index=curator_obj.VolIss, 

260 silencer=settings.get("silencer", []), 

261 ) 

262 creator = creator_obj.creator(source=self.source) 

263 self.timer.record_metric("entities_created", len(creator.res_to_entity)) 

264 

265 with self.timer.timer("provenance_generation"): 

266 prov = ProvSet( 

267 creator, 

268 self.base_iri, 

269 wanted_label=False, 

270 supplier_prefix=self.supplier_prefix, 

271 custom_counter_handler=self.counter_handler, 

272 ) 

273 modified_entities = prov.generate_provenance() 

274 self.timer.record_metric("modified_entities", len(modified_entities)) 

275 

276 repok = Reporter(print_sentences=False) 

277 reperr = Reporter(print_sentences=True, prefix="[Storer: ERROR] ") 

278 res_storer = Storer( 

279 abstract_set=creator, 

280 repok=repok, 

281 reperr=reperr, 

282 context_map={}, 

283 dir_split=self.dir_split_number, 

284 n_file_item=self.items_per_file, 

285 default_dir=self.default_dir, 

286 output_format="json-ld", 

287 zip_output=self.zip_output_rdf, 

288 modified_entities=modified_entities, 

289 ) 

290 prov_storer = Storer( 

291 abstract_set=prov, 

292 repok=repok, 

293 reperr=reperr, 

294 context_map={}, 

295 dir_split=self.dir_split_number, 

296 n_file_item=self.items_per_file, 

297 output_format="json-ld", 

298 zip_output=self.zip_output_rdf, 

299 modified_entities=modified_entities, 

300 ) 

301 self.store_data_and_prov(res_storer, prov_storer) 

302 

303 return {"message": "success"}, cache_path, errors_path, filename 

304 except Exception as e: 

305 tb = traceback.format_exc() 

306 template = ( 

307 "An exception of type {0} occurred. Arguments:\n{1!r}\nTraceback:\n{2}" 

308 ) 

309 message = template.format(type(e).__name__, e.args, tb) 

310 if isinstance(e, BulkLoadError): 

311 print(message) 

312 raise 

313 return {"message": message}, cache_path, errors_path, filename 

314 

315 def _setup_output_directories(self) -> None: 

316 """Create output directories for data and provenance.""" 

317 os.makedirs(self.data_update_dir, exist_ok=True) 

318 os.makedirs(self.prov_update_dir, exist_ok=True) 

319 

320 def _upload_sparql_queries(self) -> None: 

321 """Upload SPARQL queries to triplestores in parallel using fork.""" 

322 data_upload_folder = os.path.join(self.data_update_dir, "to_be_uploaded") 

323 prov_upload_folder = os.path.join(self.prov_update_dir, "to_be_uploaded") 

324 

325 ctx = multiprocessing.get_context('fork') 

326 

327 data_process = ctx.Process( 

328 target=_upload_to_triplestore, 

329 args=( 

330 self.triplestore_url, 

331 data_upload_folder, 

332 self.redis_host, 

333 self.redis_port, 

334 self.redis_cache_db, 

335 self.ts_failed_queries, 

336 self.ts_stop_file, 

337 "Uploading data SPARQL" 

338 ) 

339 ) 

340 

341 prov_process = ctx.Process( 

342 target=_upload_to_triplestore, 

343 args=( 

344 self.provenance_triplestore_url, 

345 prov_upload_folder, 

346 self.redis_host, 

347 self.redis_port, 

348 self.redis_cache_db, 

349 self.ts_failed_queries, 

350 self.ts_stop_file, 

351 "Uploading prov SPARQL" 

352 ) 

353 ) 

354 

355 data_process.start() 

356 prov_process.start() 

357 

358 data_process.join() 

359 prov_process.join() 

360 

361 if data_process.exitcode != 0: 

362 raise RuntimeError(f"Data upload failed with exit code {data_process.exitcode}") 

363 if prov_process.exitcode != 0: 

364 raise RuntimeError(f"Provenance upload failed with exit code {prov_process.exitcode}") 

365 

366 def store_data_and_prov( 

367 self, res_storer: Storer, prov_storer: Storer 

368 ) -> None: 

369 """Orchestrate storage and upload using appropriate strategy.""" 

370 self._setup_output_directories() 

371 

372 bulk_config = self.settings.get("virtuoso_bulk_load", {}) 

373 if bulk_config.get("enabled", False): 

374 self._store_and_upload(res_storer, prov_storer, self.timer, bulk_config) 

375 else: 

376 self._store_and_upload(res_storer, prov_storer, self.timer) 

377 

378 def _store_and_upload( 

379 self, res_storer: Storer, prov_storer: Storer, timer: ProcessTimer, bulk_config: dict = None 

380 ) -> None: 

381 """Store RDF files and upload queries to triplestore with parallel execution.""" 

382 use_bulk_load = bulk_config is not None 

383 

384 if use_bulk_load: 

385 data_nquads_dir = os.path.abspath(bulk_config["data_mount_dir"]) 

386 prov_nquads_dir = os.path.abspath(bulk_config["prov_mount_dir"]) 

387 os.makedirs(data_nquads_dir, exist_ok=True) 

388 os.makedirs(prov_nquads_dir, exist_ok=True) 

389 else: 

390 data_nquads_dir = None 

391 prov_nquads_dir = None 

392 

393 with timer.timer("storage"): 

394 ctx = multiprocessing.get_context('fork') 

395 rdf_store_processes = [] 

396 

397 if self.generate_rdf_files: 

398 data_store_process = ctx.Process( 

399 target=_store_rdf_worker, 

400 args=(res_storer, self.output_rdf_dir, self.base_iri, self.context_path) 

401 ) 

402 prov_store_process = ctx.Process( 

403 target=_store_rdf_worker, 

404 args=(prov_storer, self.output_rdf_dir, self.base_iri, self.context_path) 

405 ) 

406 rdf_store_processes = [data_store_process, prov_store_process] 

407 for p in rdf_store_processes: 

408 p.start() 

409 

410 data_query_process = ctx.Process( 

411 target=_generate_queries_worker, 

412 args=(res_storer, self.triplestore_url, self.data_update_dir, 

413 not use_bulk_load, use_bulk_load, data_nquads_dir) 

414 ) 

415 prov_query_process = ctx.Process( 

416 target=_generate_queries_worker, 

417 args=(prov_storer, self.provenance_triplestore_url, self.prov_update_dir, 

418 not use_bulk_load, use_bulk_load, prov_nquads_dir) 

419 ) 

420 data_query_process.start() 

421 prov_query_process.start() 

422 data_query_process.join() 

423 prov_query_process.join() 

424 

425 if data_query_process.exitcode != 0: 

426 raise RuntimeError(f"Data query generation failed with exit code {data_query_process.exitcode}") 

427 if prov_query_process.exitcode != 0: 

428 raise RuntimeError(f"Prov query generation failed with exit code {prov_query_process.exitcode}") 

429 

430 self._upload_sparql_queries() 

431 

432 if use_bulk_load: 

433 _run_bulk_load_process((bulk_config["data_container"], bulk_config["bulk_load_dir"], data_nquads_dir)) 

434 _run_bulk_load_process((bulk_config["prov_container"], bulk_config["bulk_load_dir"], prov_nquads_dir)) 

435 for nq_file in glob.glob(os.path.join(data_nquads_dir, "*.nq.gz")): 

436 os.remove(nq_file) 

437 for nq_file in glob.glob(os.path.join(prov_nquads_dir, "*.nq.gz")): 

438 os.remove(nq_file) 

439 

440 for p in rdf_store_processes: 

441 p.join() 

442 if p.exitcode != 0: 

443 raise RuntimeError(f"RDF storage failed with exit code {p.exitcode}") 

444 

445 def run_sparql_updates(self, endpoint: str, folder: str): 

446 upload_sparql_updates( 

447 endpoint=endpoint, 

448 folder=folder, 

449 failed_file=self.ts_failed_queries, 

450 stop_file=self.ts_stop_file, 

451 redis_host=self.redis_host, 

452 redis_port=self.redis_port, 

453 redis_db=self.redis_cache_db, 

454 ) 

455 

456 

457def _save_incremental_report(all_reports: List[Dict[str, Any]], meta_config_path: str, output_path: str) -> None: 

458 """Save incremental timing report to JSON file.""" 

459 aggregate_report = { 

460 "timestamp": datetime.now().isoformat(), 

461 "config_path": meta_config_path, 

462 "total_files_processed": len(all_reports), 

463 "files": all_reports, 

464 "aggregate": _compute_aggregate_metrics(all_reports) 

465 } 

466 with open(output_path, 'w') as f: 

467 json.dump(aggregate_report, f, indent=2) 

468 

469 

470def _compute_aggregate_metrics(all_reports: List[Dict[str, Any]]) -> Dict[str, Any]: 

471 """Compute aggregate statistics across all file reports.""" 

472 if not all_reports: 

473 return {} 

474 

475 total_duration = sum(r["report"]["metrics"].get("total_duration_seconds", 0) for r in all_reports) 

476 total_records = sum(r["report"]["metrics"].get("input_records", 0) for r in all_reports) 

477 total_entities = sum(r["report"]["metrics"].get("entities_created", 0) for r in all_reports) 

478 

479 durations = [r["report"]["metrics"].get("total_duration_seconds", 0) for r in all_reports] 

480 throughputs = [r["report"]["metrics"].get("throughput_records_per_sec", 0) for r in all_reports] 

481 

482 return { 

483 "total_files": len(all_reports), 

484 "total_duration_seconds": round(total_duration, 3), 

485 "total_records_processed": total_records, 

486 "total_entities_created": total_entities, 

487 "average_time_per_file": round(total_duration / len(all_reports), 3) if all_reports else 0, 

488 "average_throughput": round(sum(throughputs) / len(throughputs), 2) if throughputs else 0, 

489 "min_time": round(min(durations), 3) if durations else 0, 

490 "max_time": round(max(durations), 3) if durations else 0, 

491 "overall_throughput": round(total_records / total_duration, 2) if total_duration > 0 else 0 

492 } 

493 

494 

495def _print_aggregate_summary(all_reports: List[Dict[str, Any]]) -> None: 

496 """Print aggregate summary of all processed files.""" 

497 aggregate = _compute_aggregate_metrics(all_reports) 

498 

499 print(f"\n{'='*60}") 

500 print("Aggregate Timing Summary") 

501 print(f"{'='*60}") 

502 print(f"Total Files: {aggregate['total_files']}") 

503 print(f"Total Duration: {aggregate['total_duration_seconds']}s") 

504 print(f"Total Records: {aggregate['total_records_processed']}") 

505 print(f"Total Entities: {aggregate['total_entities_created']}") 

506 print(f"Average Time/File: {aggregate['average_time_per_file']}s") 

507 print(f"Min/Max Time: {aggregate['min_time']}s / {aggregate['max_time']}s") 

508 print(f"Overall Throughput: {aggregate['overall_throughput']} rec/s") 

509 print(f"{'='*60}\n") 

510 

511 

512def run_meta_process( 

513 settings: dict, meta_config_path: str, enable_timing: bool = False, timing_output: Optional[str] = None 

514) -> None: 

515 is_unix = platform in {"linux", "linux2", "darwin"} 

516 all_reports = [] 

517 

518 meta_process_setup = MetaProcess(settings=settings, meta_config_path=meta_config_path) 

519 files_to_be_processed = meta_process_setup.prepare_folders() 

520 

521 generate_gentle_buttons(meta_process_setup.base_output_dir, meta_config_path, is_unix) 

522 

523 with tqdm(total=len(files_to_be_processed), desc="Processing files") as progress_bar: 

524 for idx, filename in enumerate(files_to_be_processed, 1): 

525 try: 

526 if os.path.exists(os.path.join(meta_process_setup.base_output_dir, ".stop")): 

527 print("\nStop file detected. Halting processing.") 

528 break 

529 

530 if enable_timing: 

531 print(f"\n[{idx}/{len(files_to_be_processed)}] Processing {filename}...") 

532 

533 file_timer = ProcessTimer(enabled=enable_timing, verbose=enable_timing) 

534 meta_process = MetaProcess(settings=settings, meta_config_path=meta_config_path, timer=file_timer) 

535 

536 result = meta_process.curate_and_create( 

537 filename, 

538 meta_process.cache_path, 

539 meta_process.errors_path, 

540 settings=settings, 

541 meta_config_path=meta_config_path 

542 ) 

543 task_done(result) 

544 

545 if enable_timing: 

546 report = file_timer.get_report() 

547 all_reports.append({ 

548 "filename": filename, 

549 "report": report 

550 }) 

551 

552 file_timer.print_file_summary(filename) 

553 

554 if timing_output: 

555 _save_incremental_report(all_reports, meta_config_path, timing_output) 

556 print(f"\n JSON updated: {timing_output}") 

557 

558 chart_file = timing_output.replace('.json', '_chart.png') if timing_output else 'meta_process_timing_chart.png' 

559 plot_incremental_progress(all_reports, chart_file) 

560 print(f" Chart updated: {chart_file}\n") 

561 

562 except Exception as e: 

563 traceback_str = traceback.format_exc() 

564 print( 

565 f"Error processing file {filename}: {e}\nTraceback:\n{traceback_str}" 

566 ) 

567 finally: 

568 progress_bar.update(1) 

569 

570 if not os.path.exists(os.path.join(meta_process_setup.base_output_dir, ".stop")): 

571 if os.path.exists(meta_process_setup.cache_path): 

572 os.rename( 

573 meta_process_setup.cache_path, 

574 meta_process_setup.cache_path.replace( 

575 ".txt", f'_{datetime.now().strftime("%Y-%m-%dT%H_%M_%S_%f")}.txt' 

576 ), 

577 ) 

578 if is_unix: 

579 delete_lock_files(base_dir=meta_process_setup.base_output_dir) 

580 

581 if enable_timing and all_reports: 

582 _print_aggregate_summary(all_reports) 

583 if timing_output: 

584 aggregate_report = { 

585 "timestamp": datetime.now().isoformat(), 

586 "config_path": meta_config_path, 

587 "total_files": len(all_reports), 

588 "files": all_reports, 

589 "aggregate": _compute_aggregate_metrics(all_reports) 

590 } 

591 with open(timing_output, 'w') as f: 

592 json.dump(aggregate_report, f, indent=2) 

593 print(f"[Timing] Report saved to {timing_output}") 

594 

595 

596def task_done(task_output: tuple) -> None: 

597 message, cache_path, errors_path, filename = task_output 

598 if message["message"] == "skip": 

599 pass 

600 elif message["message"] == "success": 

601 if not os.path.exists(cache_path): 

602 with open(cache_path, "w", encoding="utf-8") as aux_file: 

603 aux_file.write(filename + "\n") 

604 else: 

605 with open(cache_path, "r", encoding="utf-8") as aux_file: 

606 cache_data = aux_file.read().splitlines() 

607 cache_data.append(filename) 

608 try: 

609 data_sorted = sorted( 

610 cache_data, 

611 key=lambda filename: int(filename.replace(".csv", "")), 

612 reverse=False, 

613 ) 

614 except ValueError: 

615 data_sorted = cache_data 

616 with open(cache_path, "w", encoding="utf-8") as aux_file: 

617 aux_file.write("\n".join(data_sorted)) 

618 else: 

619 with open(errors_path, "a", encoding="utf-8") as aux_file: 

620 aux_file.write(f'{filename}: {message["message"]}' + "\n") 

621 

622 

623def chunks(lst: list, n: int) -> Iterator[list]: 

624 """Yield successive n-sized chunks from lst.""" 

625 for i in range(0, len(lst), n): 

626 yield lst[i : i + n] 

627 

628 

629def delete_lock_files(base_dir: list) -> None: 

630 for dirpath, _, filenames in os.walk(base_dir): 

631 for filename in filenames: 

632 if filename.endswith(".lock"): 

633 os.remove(os.path.join(dirpath, filename)) 

634 

635 

636def generate_gentle_buttons(dir: str, config: str, is_unix: bool): 

637 if os.path.exists(os.path.join(dir, ".stop")): 

638 os.remove(os.path.join(dir, ".stop")) 

639 ext = "sh" if is_unix else "bat" 

640 with open(f"gently_run.{ext}", "w") as rsh: 

641 rsh.write( 

642 f'{executable} -m oc_meta.lib.stopper -t "{dir}" --remove\n{executable} -m oc_meta.run.meta_process -c {config}' 

643 ) 

644 with open(f"gently_stop.{ext}", "w") as rsh: 

645 rsh.write(f'{executable} -m oc_meta.lib.stopper -t "{dir}" --add') 

646 

647 

648if __name__ == "__main__": # pragma: no cover 

649 arg_parser = ArgumentParser( 

650 "meta_process.py", 

651 description="This script runs the OCMeta data processing workflow", 

652 ) 

653 arg_parser.add_argument( 

654 "-c", 

655 "--config", 

656 dest="config", 

657 required=True, 

658 help="Configuration file directory", 

659 ) 

660 arg_parser.add_argument( 

661 "--timing", 

662 action="store_true", 

663 help="Enable timing metrics collection and display summary at the end", 

664 ) 

665 arg_parser.add_argument( 

666 "--timing-output", 

667 dest="timing_output", 

668 default=None, 

669 help="Optional path to save timing report as JSON file", 

670 ) 

671 args = arg_parser.parse_args() 

672 with open(args.config, encoding="utf-8") as file: 

673 settings = yaml.full_load(file) 

674 run_meta_process( 

675 settings=settings, 

676 meta_config_path=args.config, 

677 enable_timing=args.timing, 

678 timing_output=args.timing_output 

679 )