Coverage for oc_meta / run / meta / check_results.py: 91%

379 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import multiprocessing 

7import os 

8import sys 

9import zipfile 

10from concurrent.futures import ProcessPoolExecutor, as_completed 

11from dataclasses import dataclass, field 

12from datetime import datetime 

13from typing import Callable, Dict, List, Set 

14 

15import orjson 

16import polars as pl 

17import yaml 

18from rich.progress import BarColumn, Progress, TaskProgressColumn, TextColumn, TimeElapsedColumn 

19from rich_argparse import RichHelpFormatter 

20from oc_meta.constants import QLEVER_BATCH_SIZE, QLEVER_MAX_WORKERS 

21from oc_meta.lib.cleaner import normalize_hyphens, normalize_id 

22from oc_meta.lib.console import EMATimeRemainingColumn, console 

23from oc_meta.lib.file_manager import collect_files 

24from oc_meta.lib.master_of_regex import RE_ENTITY_URI, RE_SEMICOLON_IN_PEOPLE_FIELD, split_name_and_ids 

25from oc_meta.lib.sparql import run_queries_parallel 

26 

27MAX_RETRIES = 10 

28RETRY_BACKOFF = 2 

29DATACITE_PREFIX = "http://purl.org/spar/datacite/" 

30 

31_SPACE_PATTERN = '[\t\xa0\u200b\u202f\u2003\u2005\u2009]' 

32_ID_COLUMNS = ['id', 'author', 'editor', 'publisher', 'venue'] 

33_STAT_FIELDS = ( 

34 'total_rows', 'rows_with_ids', 'total_identifiers', 'omid_schema_identifiers', 

35 'identifiers_skipped_invalid', 

36 'identifiers_with_omids', 'identifiers_without_omids', 'data_graphs_found', 

37 'data_graphs_missing', 'prov_graphs_found', 'prov_graphs_missing', 

38 'omids_with_provenance', 'omids_without_provenance', 

39) 

40 

41 

42@dataclass 

43class FileResult: 

44 file: str 

45 total_rows: int = 0 

46 rows_with_ids: int = 0 

47 total_identifiers: int = 0 

48 omid_schema_identifiers: int = 0 

49 identifiers_skipped_invalid: int = 0 

50 identifiers_with_omids: int = 0 

51 identifiers_without_omids: int = 0 

52 data_graphs_found: int = 0 

53 data_graphs_missing: int = 0 

54 prov_graphs_found: int = 0 

55 prov_graphs_missing: int = 0 

56 omids_with_provenance: int = 0 

57 omids_without_provenance: int = 0 

58 errors: list = field(default_factory=list) 

59 id_key_to_omids: dict = field(default_factory=dict) 

60 id_key_locations: dict = field(default_factory=dict) 

61 

62 

63 

64 

65def check_provenance_existence(omids: List[str], prov_endpoint_url: str, workers: int = QLEVER_MAX_WORKERS, progress_callback: Callable[[int], None] | None = None) -> Dict[str, bool]: 

66 if not omids: 

67 return {} 

68 

69 prov_results = {omid: False for omid in omids} 

70 

71 batch_queries = [] 

72 batch_sizes = [] 

73 for i in range(0, len(omids), QLEVER_BATCH_SIZE): 

74 batch = omids[i:i + QLEVER_BATCH_SIZE] 

75 values_entries = " ".join(f"<{omid}/prov/se/1>" for omid in batch) 

76 query = f""" 

77 SELECT ?snapshot WHERE {{ 

78 VALUES ?snapshot {{ {values_entries} }} 

79 ?snapshot <http://www.w3.org/ns/prov#specializationOf> ?o . 

80 }} 

81 """ 

82 batch_queries.append(query) 

83 batch_sizes.append(len(batch)) 

84 

85 all_bindings = run_queries_parallel(prov_endpoint_url, batch_queries, batch_sizes, workers, progress_callback, max_retries=MAX_RETRIES, backoff_factor=RETRY_BACKOFF) 

86 

87 for bindings in all_bindings: 

88 for result in bindings: 

89 snapshot_uri = result["snapshot"]["value"] 

90 omid = snapshot_uri.rsplit("/prov/se/1", 1)[0] 

91 prov_results[omid] = True 

92 

93 return prov_results 

94 

95 

96def check_omids_existence(identifiers: List[Dict[str, str]], endpoint_url: str, workers: int = QLEVER_MAX_WORKERS, progress_callback: Callable[[int], None] | None = None) -> Dict[str, Set[str]]: 

97 if not identifiers: 

98 return {} 

99 

100 found_omids: Dict[str, Set[str]] = {} 

101 

102 batch_queries = [] 

103 batch_sizes = [] 

104 for i in range(0, len(identifiers), QLEVER_BATCH_SIZE): 

105 batch = identifiers[i:i + QLEVER_BATCH_SIZE] 

106 

107 values_entries = [] 

108 for identifier in batch: 

109 escaped_value = identifier['value'].replace('\\', '\\\\').replace('"', '\\"') 

110 values_entries.append(f'("{escaped_value}"^^xsd:string datacite:{identifier["schema"]})') 

111 

112 query = f""" 

113 PREFIX datacite: <http://purl.org/spar/datacite/> 

114 PREFIX literal: <http://www.essepuntato.it/2010/06/literalreification/> 

115 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> 

116 

117 SELECT ?val ?scheme ?omid 

118 WHERE {{ 

119 VALUES (?val ?scheme) {{ {" ".join(values_entries)} }} 

120 ?omid literal:hasLiteralValue ?val ; 

121 datacite:usesIdentifierScheme ?scheme . 

122 }} 

123 """ 

124 batch_queries.append(query) 

125 batch_sizes.append(len(batch)) 

126 

127 all_bindings = run_queries_parallel(endpoint_url, batch_queries, batch_sizes, workers, progress_callback, max_retries=MAX_RETRIES, backoff_factor=RETRY_BACKOFF) 

128 

129 for bindings in all_bindings: 

130 for result in bindings: 

131 omid = result["omid"]["value"] 

132 val = result["val"]["value"] 

133 scheme_uri = result["scheme"]["value"] 

134 scheme = scheme_uri[len(DATACITE_PREFIX):] if scheme_uri.startswith(DATACITE_PREFIX) else scheme_uri 

135 id_key = f"{scheme}:{val}" 

136 if id_key not in found_omids: 

137 found_omids[id_key] = set() 

138 found_omids[id_key].add(omid) 

139 

140 return found_omids 

141 

142 

143def find_file(rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str, zip_output_rdf: bool) -> str|None: 

144 entity_match = RE_ENTITY_URI.match(uri) 

145 if entity_match: 

146 cur_number = int(entity_match.group('entity_number')) 

147 cur_file_split = ((cur_number - 1) // items_per_file + 1) * items_per_file 

148 cur_split = ((cur_number - 1) // dir_split_number + 1) * dir_split_number 

149 short_name = entity_match.group('short_name') 

150 sub_folder = entity_match.group('supplier_prefix') 

151 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split)) 

152 extension = '.zip' if zip_output_rdf else '.json' 

153 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + extension 

154 return cur_file_path 

155 return None 

156 

157 

158def find_prov_file(data_zip_path: str) -> str|None: 

159 base_dir = os.path.dirname(data_zip_path) 

160 file_name = os.path.splitext(os.path.basename(data_zip_path))[0] 

161 prov_dir = os.path.join(base_dir, file_name, 'prov') 

162 prov_file = os.path.join(prov_dir, 'se.zip') 

163 return prov_file if os.path.exists(prov_file) else None 

164 

165 

166def _check_zip_file(args: tuple) -> tuple[str, dict[str, tuple[bool, bool]]]: 

167 zip_path, omids = args 

168 

169 raw_data = b'' 

170 with zipfile.ZipFile(zip_path, 'r') as z: 

171 json_files = [f for f in z.namelist() if f.endswith('.json')] 

172 if json_files: 

173 with z.open(json_files[0]) as f: 

174 raw_data = f.read() 

175 

176 raw_prov = b'' 

177 prov_path = find_prov_file(zip_path) 

178 if prov_path: 

179 with zipfile.ZipFile(prov_path, 'r') as z: 

180 json_files = [f for f in z.namelist() if f.endswith('.json')] 

181 if json_files: 

182 with z.open(json_files[0]) as f: 

183 raw_prov = f.read() 

184 

185 results: dict[str, tuple[bool, bool]] = {} 

186 for omid in omids: 

187 omid_bytes = (omid + '"').encode() 

188 results[omid] = (omid_bytes in raw_data, (omid + '/prov/"').encode() in raw_prov) 

189 return zip_path, results 

190 

191 

192def _extract_id_pairs(cell: str, col: str) -> list[tuple[str, str]]: 

193 pairs: list[tuple[str, str]] = [] 

194 if col == 'id': 

195 for token in cell.strip().split(): 

196 colon_pos = token.find(':') 

197 if colon_pos > 0: 

198 pairs.append((token[:colon_pos].lower(), normalize_hyphens(token[colon_pos + 1:]))) 

199 else: 

200 for element in RE_SEMICOLON_IN_PEOPLE_FIELD.split(cell): 

201 _, ids_str = split_name_and_ids(element) 

202 if ids_str: 

203 for token in ids_str.strip().split(): 

204 colon_pos = token.find(':') 

205 if colon_pos > 0: 

206 pairs.append((token[:colon_pos].lower(), normalize_hyphens(token[colon_pos + 1:]))) 

207 return pairs 

208 

209 

210def process_csv_file(args: tuple, workers: int = QLEVER_MAX_WORKERS, progress=None, task_id=None) -> FileResult: 

211 csv_file, endpoint_url, prov_endpoint_url, rdf_dir, dir_split_number, items_per_file, zip_output_rdf = args 

212 

213 result = FileResult(file=os.path.basename(csv_file)) 

214 

215 if progress and task_id is not None: 

216 progress.update(task_id, detail="Phase 1/5: Reading CSV") 

217 

218 with open(csv_file, 'rb') as f: 

219 raw = f.read() 

220 df = pl.read_csv(raw.replace(b'\0', b''), columns=_ID_COLUMNS, infer_schema_length=0) 

221 df = df.with_columns([ 

222 pl.col(c) 

223 .str.replace_all(_SPACE_PATTERN, ' ') 

224 .str.replace_all('&nbsp;', ' ', literal=True) 

225 for c in _ID_COLUMNS 

226 ]) 

227 

228 result.total_rows = len(df) 

229 col_lists = {col: df[col].to_list() for col in _ID_COLUMNS} 

230 del df 

231 

232 unique_id_keys: set[str] = set() 

233 id_key_meta: dict[str, tuple[str, str]] = {} 

234 id_key_occurrences: dict[str, list[tuple[int, str]]] = {} 

235 omid_values: list[str] = [] 

236 

237 phase1_task = None 

238 if progress: 

239 phase1_task = progress.add_task(" Phase 1/5: Extracting identifiers", total=result.total_rows, detail="") 

240 

241 for row_idx in range(result.total_rows): 

242 row_has_ids = False 

243 for col in _ID_COLUMNS: 

244 cell = col_lists[col][row_idx] 

245 if not cell: 

246 continue 

247 

248 pairs = _extract_id_pairs(cell, col) 

249 if not pairs: 

250 continue 

251 

252 row_has_ids = True 

253 row_num = row_idx + 1 

254 result.total_identifiers += len(pairs) 

255 

256 for schema, value in pairs: 

257 if schema == 'omid': 

258 result.omid_schema_identifiers += 1 

259 if value.startswith('http'): 

260 omid_values.append(value) 

261 continue 

262 normalized = normalize_id(f"{schema}:{value}") 

263 if not normalized: 

264 result.identifiers_skipped_invalid += 1 

265 continue 

266 norm_schema, norm_value = normalized.split(':', 1) 

267 id_key = normalized 

268 unique_id_keys.add(id_key) 

269 if id_key not in id_key_meta: 

270 id_key_meta[id_key] = (norm_schema, norm_value) 

271 if id_key not in id_key_occurrences: 

272 id_key_occurrences[id_key] = [] 

273 id_key_occurrences[id_key].append((row_num, col)) 

274 

275 if row_has_ids: 

276 result.rows_with_ids += 1 

277 if progress and phase1_task is not None: 

278 progress.advance(phase1_task) 

279 

280 if progress and phase1_task is not None: 

281 progress.update(phase1_task, visible=False) 

282 

283 del col_lists 

284 

285 all_identifiers = [{'schema': sv[0], 'value': sv[1]} for sv in id_key_meta.values()] 

286 total_ids = len(all_identifiers) 

287 

288 phase2_task = None 

289 if progress: 

290 phase2_task = progress.add_task(" Phase 2/5: Querying DB", total=total_ids, detail="") 

291 

292 def on_id_batch(batch_size: int): 

293 if progress and phase2_task is not None: 

294 progress.advance(phase2_task, batch_size) 

295 

296 identifier_cache = check_omids_existence(all_identifiers, endpoint_url, workers=workers, progress_callback=on_id_batch) 

297 

298 if progress and phase2_task is not None: 

299 progress.update(phase2_task, visible=False) 

300 

301 omids_by_file: dict[str, set[str]] = {} 

302 all_omids: set[str] = set() 

303 omid_to_id_info: dict[str, tuple[int, str, str]] = {} 

304 path_exists_cache: dict[str, bool] = {} 

305 csv_basename = os.path.basename(csv_file) 

306 

307 phase3_task = None 

308 if progress: 

309 phase3_task = progress.add_task(" Phase 3/5: Mapping OMIDs", total=len(unique_id_keys), detail="") 

310 

311 for id_key in unique_id_keys: 

312 occurrences = id_key_occurrences[id_key] 

313 if id_key in identifier_cache: 

314 result.identifiers_with_omids += len(occurrences) 

315 omids = identifier_cache[id_key] 

316 result.id_key_to_omids[id_key] = omids 

317 result.id_key_locations[id_key] = [ 

318 {'file': csv_basename, 'row': r, 'column': c} 

319 for r, c in occurrences 

320 ] 

321 all_omids.add(next(iter(omids))) 

322 for omid in omids: 

323 if omid not in omid_to_id_info: 

324 omid_to_id_info[omid] = (occurrences[0][0], occurrences[0][1], id_key) 

325 zip_path = find_file(rdf_dir, dir_split_number, items_per_file, omid, zip_output_rdf) 

326 if zip_path: 

327 if zip_path in omids_by_file: 

328 omids_by_file[zip_path].add(omid) 

329 else: 

330 if zip_path not in path_exists_cache: 

331 path_exists_cache[zip_path] = os.path.exists(zip_path) 

332 if path_exists_cache[zip_path]: 

333 omids_by_file[zip_path] = {omid} 

334 else: 

335 result.identifiers_without_omids += len(occurrences) 

336 schema, value = id_key_meta[id_key] 

337 for row_num, col in occurrences: 

338 result.errors.append({ 

339 "type": "missing_omid", 

340 "schema": schema, 

341 "value": value, 

342 "file": csv_basename, 

343 "row": row_num, 

344 "column": col, 

345 }) 

346 if progress and phase3_task is not None: 

347 progress.advance(phase3_task) 

348 

349 if progress and phase3_task is not None: 

350 progress.update(phase3_task, visible=False) 

351 

352 for omid_uri in omid_values: 

353 all_omids.add(omid_uri) 

354 

355 total_rdf_files = len(omids_by_file) 

356 total_omids = len(all_omids) 

357 

358 prov_future = None 

359 prov_executor = None 

360 if total_omids > 0: 

361 prov_executor = ProcessPoolExecutor( 

362 max_workers=1, 

363 mp_context=multiprocessing.get_context('forkserver') 

364 ) 

365 prov_future = prov_executor.submit( 

366 check_provenance_existence, list(all_omids), prov_endpoint_url, workers 

367 ) 

368 

369 phase4_task = None 

370 if progress: 

371 phase4_task = progress.add_task(" Phase 4/5: Checking RDF files", total=total_rdf_files, detail="") 

372 

373 zip_args = [(zp, list(omids)) for zp, omids in omids_by_file.items()] 

374 

375 def _apply_zip_results(zip_results: dict[str, tuple[bool, bool]]) -> None: 

376 for omid, (data_found, prov_found) in zip_results.items(): 

377 if data_found: 

378 result.data_graphs_found += 1 

379 else: 

380 result.data_graphs_missing += 1 

381 if prov_found: 

382 result.prov_graphs_found += 1 

383 else: 

384 result.prov_graphs_missing += 1 

385 

386 if zip_args and workers > 1: 

387 with ProcessPoolExecutor( 

388 max_workers=min(len(zip_args), workers), 

389 mp_context=multiprocessing.get_context('forkserver') 

390 ) as executor: 

391 for future in as_completed( 

392 {executor.submit(_check_zip_file, a): a for a in zip_args} 

393 ): 

394 _zip_path, zip_results = future.result() 

395 _apply_zip_results(zip_results) 

396 if progress and phase4_task is not None: 

397 progress.advance(phase4_task) 

398 else: 

399 for a in zip_args: 

400 _zip_path, zip_results = _check_zip_file(a) 

401 _apply_zip_results(zip_results) 

402 if progress and phase4_task is not None: 

403 progress.advance(phase4_task) 

404 

405 if progress and phase4_task is not None: 

406 progress.update(phase4_task, visible=False) 

407 

408 phase5_task = None 

409 if progress: 

410 phase5_task = progress.add_task(" Phase 5/5: Checking provenance", total=total_omids, detail="") 

411 

412 prov_results: Dict[str, bool] = {} 

413 if prov_future and prov_executor: 

414 prov_results = prov_future.result() 

415 prov_executor.shutdown(wait=False) 

416 

417 if progress and phase5_task is not None: 

418 progress.advance(phase5_task, total_omids) 

419 progress.update(phase5_task, visible=False) 

420 

421 for omid, has_prov in prov_results.items(): 

422 if has_prov: 

423 result.omids_with_provenance += 1 

424 else: 

425 result.omids_without_provenance += 1 

426 if omid in omid_to_id_info: 

427 row_num, col, id_key = omid_to_id_info[omid] 

428 result.errors.append({ 

429 "type": "missing_provenance", 

430 "omid": omid, 

431 "identifier": id_key, 

432 "file": csv_basename, 

433 "row": row_num, 

434 }) 

435 

436 return result 

437 

438 

439def main(): 

440 parser = argparse.ArgumentParser( 

441 description="Check MetaProcess results by verifying input CSV identifiers", 

442 formatter_class=RichHelpFormatter, 

443 ) 

444 parser.add_argument("meta_config", help="Path to meta_config.yaml file") 

445 parser.add_argument("output", help="Output file path for results (JSON)") 

446 parser.add_argument("--workers", type=int, default=QLEVER_MAX_WORKERS, help=f"Max parallel SPARQL workers (default: {QLEVER_MAX_WORKERS})") 

447 args = parser.parse_args() 

448 

449 with open(args.meta_config, 'r', encoding='utf-8') as f: 

450 config = yaml.safe_load(f) 

451 

452 input_csv_dir = config['input_csv_dir'] 

453 base_output_dir = config['output_rdf_dir'] 

454 output_rdf_dir = os.path.join(base_output_dir, 'rdf') 

455 endpoint_url = config['triplestore_url'] 

456 prov_endpoint_url = config['provenance_triplestore_url'] 

457 if not os.path.exists(output_rdf_dir): 

458 console.print(f"RDF directory not found at {output_rdf_dir}") 

459 return 

460 

461 csv_files = collect_files(input_csv_dir, pattern="*.csv") 

462 

463 if not csv_files: 

464 console.print(f"No CSV files found in {input_csv_dir}") 

465 return 

466 

467 console.print(f"Found {len(csv_files)} CSV files to process") 

468 

469 output_dir = os.path.dirname(args.output) or '.' 

470 os.makedirs(output_dir, exist_ok=True) 

471 

472 all_file_results: list[FileResult] = [] 

473 

474 process_args = [(f, endpoint_url, prov_endpoint_url, output_rdf_dir, config['dir_split_number'], config['items_per_file'], config['zip_output_rdf']) for f in csv_files] 

475 

476 with Progress( 

477 TextColumn("[progress.description]{task.description}"), 

478 BarColumn(), 

479 TextColumn("[cyan]{task.completed:.0f}/{task.total:.0f}"), 

480 TaskProgressColumn(), 

481 TimeElapsedColumn(), 

482 EMATimeRemainingColumn(), 

483 TextColumn("[cyan]{task.fields[detail]}"), 

484 ) as progress: 

485 task = progress.add_task("Processing CSV files", total=len(csv_files), detail="") 

486 

487 if len(csv_files) > 1: 

488 max_procs = min(len(csv_files), 4) 

489 file_workers = max(1, args.workers // max_procs) 

490 with ProcessPoolExecutor( 

491 max_workers=max_procs, 

492 mp_context=multiprocessing.get_context('forkserver') 

493 ) as executor: 

494 future_to_idx = { 

495 executor.submit(process_csv_file, pa, workers=file_workers): idx 

496 for idx, pa in enumerate(process_args) 

497 } 

498 for future in as_completed(future_to_idx): 

499 idx = future_to_idx[future] 

500 current_file = os.path.basename(csv_files[idx]) 

501 progress.update(task, detail=f"Completed {current_file}") 

502 all_file_results.append(future.result()) 

503 progress.advance(task) 

504 else: 

505 for idx, proc_args in enumerate(process_args): 

506 current_file = os.path.basename(csv_files[idx]) 

507 progress.update(task, detail=f"[{idx+1}/{len(csv_files)}] {current_file}") 

508 file_result = process_csv_file(proc_args, workers=args.workers, progress=progress, task_id=task) 

509 all_file_results.append(file_result) 

510 progress.advance(task) 

511 

512 errors: list[dict] = [] 

513 merged_id_key_to_omids: dict[str, set[str]] = {} 

514 merged_id_key_locations: dict[str, list[dict]] = {} 

515 

516 for fr in all_file_results: 

517 errors.extend(fr.errors) 

518 for id_key, omids in fr.id_key_to_omids.items(): 

519 if id_key in merged_id_key_to_omids: 

520 merged_id_key_to_omids[id_key].update(omids) 

521 else: 

522 merged_id_key_to_omids[id_key] = set(omids) 

523 for id_key, locs in fr.id_key_locations.items(): 

524 if id_key in merged_id_key_locations: 

525 merged_id_key_locations[id_key].extend(locs) 

526 else: 

527 merged_id_key_locations[id_key] = list(locs) 

528 

529 warnings: list[dict] = [] 

530 for id_key, omids in merged_id_key_to_omids.items(): 

531 if len(omids) > 1: 

532 warnings.append({ 

533 "type": "multiple_omids", 

534 "identifier": id_key, 

535 "omid_count": len(omids), 

536 "omids": sorted(omids), 

537 "occurrences": merged_id_key_locations[id_key], 

538 }) 

539 

540 summary = {k: 0 for k in _STAT_FIELDS} 

541 files_output = [] 

542 for fr in all_file_results: 

543 file_dict = {k: getattr(fr, k) for k in ('file',) + _STAT_FIELDS} 

544 files_output.append(file_dict) 

545 for k in _STAT_FIELDS: 

546 summary[k] += getattr(fr, k) 

547 

548 status = "PASS" if not errors else "FAIL" 

549 

550 report = { 

551 "status": status, 

552 "timestamp": datetime.now().isoformat(), 

553 "config_path": os.path.abspath(args.meta_config), 

554 "total_files_processed": len(csv_files), 

555 "files": files_output, 

556 "summary": summary, 

557 "errors": errors, 

558 "warnings": warnings, 

559 } 

560 

561 with open(args.output, 'wb') as f: 

562 f.write(orjson.dumps(report, option=orjson.OPT_INDENT_2)) 

563 

564 console.print(f"Status: {status}") 

565 console.print(f"Errors: {len(errors)}, Warnings: {len(warnings)}") 

566 console.print(f"Results written to: {args.output}") 

567 

568 if errors: 

569 sys.exit(1) 

570 

571if __name__ == "__main__": 

572 main() # pragma: no cover