Coverage for oc_meta / run / meta / check_results.py: 91%
379 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import multiprocessing
7import os
8import sys
9import zipfile
10from concurrent.futures import ProcessPoolExecutor, as_completed
11from dataclasses import dataclass, field
12from datetime import datetime
13from typing import Callable, Dict, List, Set
15import orjson
16import polars as pl
17import yaml
18from rich.progress import BarColumn, Progress, TaskProgressColumn, TextColumn, TimeElapsedColumn
19from rich_argparse import RichHelpFormatter
20from oc_meta.constants import QLEVER_BATCH_SIZE, QLEVER_MAX_WORKERS
21from oc_meta.lib.cleaner import normalize_hyphens, normalize_id
22from oc_meta.lib.console import EMATimeRemainingColumn, console
23from oc_meta.lib.file_manager import collect_files
24from oc_meta.lib.master_of_regex import RE_ENTITY_URI, RE_SEMICOLON_IN_PEOPLE_FIELD, split_name_and_ids
25from oc_meta.lib.sparql import run_queries_parallel
27MAX_RETRIES = 10
28RETRY_BACKOFF = 2
29DATACITE_PREFIX = "http://purl.org/spar/datacite/"
31_SPACE_PATTERN = '[\t\xa0\u200b\u202f\u2003\u2005\u2009]'
32_ID_COLUMNS = ['id', 'author', 'editor', 'publisher', 'venue']
33_STAT_FIELDS = (
34 'total_rows', 'rows_with_ids', 'total_identifiers', 'omid_schema_identifiers',
35 'identifiers_skipped_invalid',
36 'identifiers_with_omids', 'identifiers_without_omids', 'data_graphs_found',
37 'data_graphs_missing', 'prov_graphs_found', 'prov_graphs_missing',
38 'omids_with_provenance', 'omids_without_provenance',
39)
42@dataclass
43class FileResult:
44 file: str
45 total_rows: int = 0
46 rows_with_ids: int = 0
47 total_identifiers: int = 0
48 omid_schema_identifiers: int = 0
49 identifiers_skipped_invalid: int = 0
50 identifiers_with_omids: int = 0
51 identifiers_without_omids: int = 0
52 data_graphs_found: int = 0
53 data_graphs_missing: int = 0
54 prov_graphs_found: int = 0
55 prov_graphs_missing: int = 0
56 omids_with_provenance: int = 0
57 omids_without_provenance: int = 0
58 errors: list = field(default_factory=list)
59 id_key_to_omids: dict = field(default_factory=dict)
60 id_key_locations: dict = field(default_factory=dict)
65def check_provenance_existence(omids: List[str], prov_endpoint_url: str, workers: int = QLEVER_MAX_WORKERS, progress_callback: Callable[[int], None] | None = None) -> Dict[str, bool]:
66 if not omids:
67 return {}
69 prov_results = {omid: False for omid in omids}
71 batch_queries = []
72 batch_sizes = []
73 for i in range(0, len(omids), QLEVER_BATCH_SIZE):
74 batch = omids[i:i + QLEVER_BATCH_SIZE]
75 values_entries = " ".join(f"<{omid}/prov/se/1>" for omid in batch)
76 query = f"""
77 SELECT ?snapshot WHERE {{
78 VALUES ?snapshot {{ {values_entries} }}
79 ?snapshot <http://www.w3.org/ns/prov#specializationOf> ?o .
80 }}
81 """
82 batch_queries.append(query)
83 batch_sizes.append(len(batch))
85 all_bindings = run_queries_parallel(prov_endpoint_url, batch_queries, batch_sizes, workers, progress_callback, max_retries=MAX_RETRIES, backoff_factor=RETRY_BACKOFF)
87 for bindings in all_bindings:
88 for result in bindings:
89 snapshot_uri = result["snapshot"]["value"]
90 omid = snapshot_uri.rsplit("/prov/se/1", 1)[0]
91 prov_results[omid] = True
93 return prov_results
96def check_omids_existence(identifiers: List[Dict[str, str]], endpoint_url: str, workers: int = QLEVER_MAX_WORKERS, progress_callback: Callable[[int], None] | None = None) -> Dict[str, Set[str]]:
97 if not identifiers:
98 return {}
100 found_omids: Dict[str, Set[str]] = {}
102 batch_queries = []
103 batch_sizes = []
104 for i in range(0, len(identifiers), QLEVER_BATCH_SIZE):
105 batch = identifiers[i:i + QLEVER_BATCH_SIZE]
107 values_entries = []
108 for identifier in batch:
109 escaped_value = identifier['value'].replace('\\', '\\\\').replace('"', '\\"')
110 values_entries.append(f'("{escaped_value}"^^xsd:string datacite:{identifier["schema"]})')
112 query = f"""
113 PREFIX datacite: <http://purl.org/spar/datacite/>
114 PREFIX literal: <http://www.essepuntato.it/2010/06/literalreification/>
115 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
117 SELECT ?val ?scheme ?omid
118 WHERE {{
119 VALUES (?val ?scheme) {{ {" ".join(values_entries)} }}
120 ?omid literal:hasLiteralValue ?val ;
121 datacite:usesIdentifierScheme ?scheme .
122 }}
123 """
124 batch_queries.append(query)
125 batch_sizes.append(len(batch))
127 all_bindings = run_queries_parallel(endpoint_url, batch_queries, batch_sizes, workers, progress_callback, max_retries=MAX_RETRIES, backoff_factor=RETRY_BACKOFF)
129 for bindings in all_bindings:
130 for result in bindings:
131 omid = result["omid"]["value"]
132 val = result["val"]["value"]
133 scheme_uri = result["scheme"]["value"]
134 scheme = scheme_uri[len(DATACITE_PREFIX):] if scheme_uri.startswith(DATACITE_PREFIX) else scheme_uri
135 id_key = f"{scheme}:{val}"
136 if id_key not in found_omids:
137 found_omids[id_key] = set()
138 found_omids[id_key].add(omid)
140 return found_omids
143def find_file(rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str, zip_output_rdf: bool) -> str|None:
144 entity_match = RE_ENTITY_URI.match(uri)
145 if entity_match:
146 cur_number = int(entity_match.group('entity_number'))
147 cur_file_split = ((cur_number - 1) // items_per_file + 1) * items_per_file
148 cur_split = ((cur_number - 1) // dir_split_number + 1) * dir_split_number
149 short_name = entity_match.group('short_name')
150 sub_folder = entity_match.group('supplier_prefix')
151 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split))
152 extension = '.zip' if zip_output_rdf else '.json'
153 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + extension
154 return cur_file_path
155 return None
158def find_prov_file(data_zip_path: str) -> str|None:
159 base_dir = os.path.dirname(data_zip_path)
160 file_name = os.path.splitext(os.path.basename(data_zip_path))[0]
161 prov_dir = os.path.join(base_dir, file_name, 'prov')
162 prov_file = os.path.join(prov_dir, 'se.zip')
163 return prov_file if os.path.exists(prov_file) else None
166def _check_zip_file(args: tuple) -> tuple[str, dict[str, tuple[bool, bool]]]:
167 zip_path, omids = args
169 raw_data = b''
170 with zipfile.ZipFile(zip_path, 'r') as z:
171 json_files = [f for f in z.namelist() if f.endswith('.json')]
172 if json_files:
173 with z.open(json_files[0]) as f:
174 raw_data = f.read()
176 raw_prov = b''
177 prov_path = find_prov_file(zip_path)
178 if prov_path:
179 with zipfile.ZipFile(prov_path, 'r') as z:
180 json_files = [f for f in z.namelist() if f.endswith('.json')]
181 if json_files:
182 with z.open(json_files[0]) as f:
183 raw_prov = f.read()
185 results: dict[str, tuple[bool, bool]] = {}
186 for omid in omids:
187 omid_bytes = (omid + '"').encode()
188 results[omid] = (omid_bytes in raw_data, (omid + '/prov/"').encode() in raw_prov)
189 return zip_path, results
192def _extract_id_pairs(cell: str, col: str) -> list[tuple[str, str]]:
193 pairs: list[tuple[str, str]] = []
194 if col == 'id':
195 for token in cell.strip().split():
196 colon_pos = token.find(':')
197 if colon_pos > 0:
198 pairs.append((token[:colon_pos].lower(), normalize_hyphens(token[colon_pos + 1:])))
199 else:
200 for element in RE_SEMICOLON_IN_PEOPLE_FIELD.split(cell):
201 _, ids_str = split_name_and_ids(element)
202 if ids_str:
203 for token in ids_str.strip().split():
204 colon_pos = token.find(':')
205 if colon_pos > 0:
206 pairs.append((token[:colon_pos].lower(), normalize_hyphens(token[colon_pos + 1:])))
207 return pairs
210def process_csv_file(args: tuple, workers: int = QLEVER_MAX_WORKERS, progress=None, task_id=None) -> FileResult:
211 csv_file, endpoint_url, prov_endpoint_url, rdf_dir, dir_split_number, items_per_file, zip_output_rdf = args
213 result = FileResult(file=os.path.basename(csv_file))
215 if progress and task_id is not None:
216 progress.update(task_id, detail="Phase 1/5: Reading CSV")
218 with open(csv_file, 'rb') as f:
219 raw = f.read()
220 df = pl.read_csv(raw.replace(b'\0', b''), columns=_ID_COLUMNS, infer_schema_length=0)
221 df = df.with_columns([
222 pl.col(c)
223 .str.replace_all(_SPACE_PATTERN, ' ')
224 .str.replace_all(' ', ' ', literal=True)
225 for c in _ID_COLUMNS
226 ])
228 result.total_rows = len(df)
229 col_lists = {col: df[col].to_list() for col in _ID_COLUMNS}
230 del df
232 unique_id_keys: set[str] = set()
233 id_key_meta: dict[str, tuple[str, str]] = {}
234 id_key_occurrences: dict[str, list[tuple[int, str]]] = {}
235 omid_values: list[str] = []
237 phase1_task = None
238 if progress:
239 phase1_task = progress.add_task(" Phase 1/5: Extracting identifiers", total=result.total_rows, detail="")
241 for row_idx in range(result.total_rows):
242 row_has_ids = False
243 for col in _ID_COLUMNS:
244 cell = col_lists[col][row_idx]
245 if not cell:
246 continue
248 pairs = _extract_id_pairs(cell, col)
249 if not pairs:
250 continue
252 row_has_ids = True
253 row_num = row_idx + 1
254 result.total_identifiers += len(pairs)
256 for schema, value in pairs:
257 if schema == 'omid':
258 result.omid_schema_identifiers += 1
259 if value.startswith('http'):
260 omid_values.append(value)
261 continue
262 normalized = normalize_id(f"{schema}:{value}")
263 if not normalized:
264 result.identifiers_skipped_invalid += 1
265 continue
266 norm_schema, norm_value = normalized.split(':', 1)
267 id_key = normalized
268 unique_id_keys.add(id_key)
269 if id_key not in id_key_meta:
270 id_key_meta[id_key] = (norm_schema, norm_value)
271 if id_key not in id_key_occurrences:
272 id_key_occurrences[id_key] = []
273 id_key_occurrences[id_key].append((row_num, col))
275 if row_has_ids:
276 result.rows_with_ids += 1
277 if progress and phase1_task is not None:
278 progress.advance(phase1_task)
280 if progress and phase1_task is not None:
281 progress.update(phase1_task, visible=False)
283 del col_lists
285 all_identifiers = [{'schema': sv[0], 'value': sv[1]} for sv in id_key_meta.values()]
286 total_ids = len(all_identifiers)
288 phase2_task = None
289 if progress:
290 phase2_task = progress.add_task(" Phase 2/5: Querying DB", total=total_ids, detail="")
292 def on_id_batch(batch_size: int):
293 if progress and phase2_task is not None:
294 progress.advance(phase2_task, batch_size)
296 identifier_cache = check_omids_existence(all_identifiers, endpoint_url, workers=workers, progress_callback=on_id_batch)
298 if progress and phase2_task is not None:
299 progress.update(phase2_task, visible=False)
301 omids_by_file: dict[str, set[str]] = {}
302 all_omids: set[str] = set()
303 omid_to_id_info: dict[str, tuple[int, str, str]] = {}
304 path_exists_cache: dict[str, bool] = {}
305 csv_basename = os.path.basename(csv_file)
307 phase3_task = None
308 if progress:
309 phase3_task = progress.add_task(" Phase 3/5: Mapping OMIDs", total=len(unique_id_keys), detail="")
311 for id_key in unique_id_keys:
312 occurrences = id_key_occurrences[id_key]
313 if id_key in identifier_cache:
314 result.identifiers_with_omids += len(occurrences)
315 omids = identifier_cache[id_key]
316 result.id_key_to_omids[id_key] = omids
317 result.id_key_locations[id_key] = [
318 {'file': csv_basename, 'row': r, 'column': c}
319 for r, c in occurrences
320 ]
321 all_omids.add(next(iter(omids)))
322 for omid in omids:
323 if omid not in omid_to_id_info:
324 omid_to_id_info[omid] = (occurrences[0][0], occurrences[0][1], id_key)
325 zip_path = find_file(rdf_dir, dir_split_number, items_per_file, omid, zip_output_rdf)
326 if zip_path:
327 if zip_path in omids_by_file:
328 omids_by_file[zip_path].add(omid)
329 else:
330 if zip_path not in path_exists_cache:
331 path_exists_cache[zip_path] = os.path.exists(zip_path)
332 if path_exists_cache[zip_path]:
333 omids_by_file[zip_path] = {omid}
334 else:
335 result.identifiers_without_omids += len(occurrences)
336 schema, value = id_key_meta[id_key]
337 for row_num, col in occurrences:
338 result.errors.append({
339 "type": "missing_omid",
340 "schema": schema,
341 "value": value,
342 "file": csv_basename,
343 "row": row_num,
344 "column": col,
345 })
346 if progress and phase3_task is not None:
347 progress.advance(phase3_task)
349 if progress and phase3_task is not None:
350 progress.update(phase3_task, visible=False)
352 for omid_uri in omid_values:
353 all_omids.add(omid_uri)
355 total_rdf_files = len(omids_by_file)
356 total_omids = len(all_omids)
358 prov_future = None
359 prov_executor = None
360 if total_omids > 0:
361 prov_executor = ProcessPoolExecutor(
362 max_workers=1,
363 mp_context=multiprocessing.get_context('forkserver')
364 )
365 prov_future = prov_executor.submit(
366 check_provenance_existence, list(all_omids), prov_endpoint_url, workers
367 )
369 phase4_task = None
370 if progress:
371 phase4_task = progress.add_task(" Phase 4/5: Checking RDF files", total=total_rdf_files, detail="")
373 zip_args = [(zp, list(omids)) for zp, omids in omids_by_file.items()]
375 def _apply_zip_results(zip_results: dict[str, tuple[bool, bool]]) -> None:
376 for omid, (data_found, prov_found) in zip_results.items():
377 if data_found:
378 result.data_graphs_found += 1
379 else:
380 result.data_graphs_missing += 1
381 if prov_found:
382 result.prov_graphs_found += 1
383 else:
384 result.prov_graphs_missing += 1
386 if zip_args and workers > 1:
387 with ProcessPoolExecutor(
388 max_workers=min(len(zip_args), workers),
389 mp_context=multiprocessing.get_context('forkserver')
390 ) as executor:
391 for future in as_completed(
392 {executor.submit(_check_zip_file, a): a for a in zip_args}
393 ):
394 _zip_path, zip_results = future.result()
395 _apply_zip_results(zip_results)
396 if progress and phase4_task is not None:
397 progress.advance(phase4_task)
398 else:
399 for a in zip_args:
400 _zip_path, zip_results = _check_zip_file(a)
401 _apply_zip_results(zip_results)
402 if progress and phase4_task is not None:
403 progress.advance(phase4_task)
405 if progress and phase4_task is not None:
406 progress.update(phase4_task, visible=False)
408 phase5_task = None
409 if progress:
410 phase5_task = progress.add_task(" Phase 5/5: Checking provenance", total=total_omids, detail="")
412 prov_results: Dict[str, bool] = {}
413 if prov_future and prov_executor:
414 prov_results = prov_future.result()
415 prov_executor.shutdown(wait=False)
417 if progress and phase5_task is not None:
418 progress.advance(phase5_task, total_omids)
419 progress.update(phase5_task, visible=False)
421 for omid, has_prov in prov_results.items():
422 if has_prov:
423 result.omids_with_provenance += 1
424 else:
425 result.omids_without_provenance += 1
426 if omid in omid_to_id_info:
427 row_num, col, id_key = omid_to_id_info[omid]
428 result.errors.append({
429 "type": "missing_provenance",
430 "omid": omid,
431 "identifier": id_key,
432 "file": csv_basename,
433 "row": row_num,
434 })
436 return result
439def main():
440 parser = argparse.ArgumentParser(
441 description="Check MetaProcess results by verifying input CSV identifiers",
442 formatter_class=RichHelpFormatter,
443 )
444 parser.add_argument("meta_config", help="Path to meta_config.yaml file")
445 parser.add_argument("output", help="Output file path for results (JSON)")
446 parser.add_argument("--workers", type=int, default=QLEVER_MAX_WORKERS, help=f"Max parallel SPARQL workers (default: {QLEVER_MAX_WORKERS})")
447 args = parser.parse_args()
449 with open(args.meta_config, 'r', encoding='utf-8') as f:
450 config = yaml.safe_load(f)
452 input_csv_dir = config['input_csv_dir']
453 base_output_dir = config['output_rdf_dir']
454 output_rdf_dir = os.path.join(base_output_dir, 'rdf')
455 endpoint_url = config['triplestore_url']
456 prov_endpoint_url = config['provenance_triplestore_url']
457 if not os.path.exists(output_rdf_dir):
458 console.print(f"RDF directory not found at {output_rdf_dir}")
459 return
461 csv_files = collect_files(input_csv_dir, pattern="*.csv")
463 if not csv_files:
464 console.print(f"No CSV files found in {input_csv_dir}")
465 return
467 console.print(f"Found {len(csv_files)} CSV files to process")
469 output_dir = os.path.dirname(args.output) or '.'
470 os.makedirs(output_dir, exist_ok=True)
472 all_file_results: list[FileResult] = []
474 process_args = [(f, endpoint_url, prov_endpoint_url, output_rdf_dir, config['dir_split_number'], config['items_per_file'], config['zip_output_rdf']) for f in csv_files]
476 with Progress(
477 TextColumn("[progress.description]{task.description}"),
478 BarColumn(),
479 TextColumn("[cyan]{task.completed:.0f}/{task.total:.0f}"),
480 TaskProgressColumn(),
481 TimeElapsedColumn(),
482 EMATimeRemainingColumn(),
483 TextColumn("[cyan]{task.fields[detail]}"),
484 ) as progress:
485 task = progress.add_task("Processing CSV files", total=len(csv_files), detail="")
487 if len(csv_files) > 1:
488 max_procs = min(len(csv_files), 4)
489 file_workers = max(1, args.workers // max_procs)
490 with ProcessPoolExecutor(
491 max_workers=max_procs,
492 mp_context=multiprocessing.get_context('forkserver')
493 ) as executor:
494 future_to_idx = {
495 executor.submit(process_csv_file, pa, workers=file_workers): idx
496 for idx, pa in enumerate(process_args)
497 }
498 for future in as_completed(future_to_idx):
499 idx = future_to_idx[future]
500 current_file = os.path.basename(csv_files[idx])
501 progress.update(task, detail=f"Completed {current_file}")
502 all_file_results.append(future.result())
503 progress.advance(task)
504 else:
505 for idx, proc_args in enumerate(process_args):
506 current_file = os.path.basename(csv_files[idx])
507 progress.update(task, detail=f"[{idx+1}/{len(csv_files)}] {current_file}")
508 file_result = process_csv_file(proc_args, workers=args.workers, progress=progress, task_id=task)
509 all_file_results.append(file_result)
510 progress.advance(task)
512 errors: list[dict] = []
513 merged_id_key_to_omids: dict[str, set[str]] = {}
514 merged_id_key_locations: dict[str, list[dict]] = {}
516 for fr in all_file_results:
517 errors.extend(fr.errors)
518 for id_key, omids in fr.id_key_to_omids.items():
519 if id_key in merged_id_key_to_omids:
520 merged_id_key_to_omids[id_key].update(omids)
521 else:
522 merged_id_key_to_omids[id_key] = set(omids)
523 for id_key, locs in fr.id_key_locations.items():
524 if id_key in merged_id_key_locations:
525 merged_id_key_locations[id_key].extend(locs)
526 else:
527 merged_id_key_locations[id_key] = list(locs)
529 warnings: list[dict] = []
530 for id_key, omids in merged_id_key_to_omids.items():
531 if len(omids) > 1:
532 warnings.append({
533 "type": "multiple_omids",
534 "identifier": id_key,
535 "omid_count": len(omids),
536 "omids": sorted(omids),
537 "occurrences": merged_id_key_locations[id_key],
538 })
540 summary = {k: 0 for k in _STAT_FIELDS}
541 files_output = []
542 for fr in all_file_results:
543 file_dict = {k: getattr(fr, k) for k in ('file',) + _STAT_FIELDS}
544 files_output.append(file_dict)
545 for k in _STAT_FIELDS:
546 summary[k] += getattr(fr, k)
548 status = "PASS" if not errors else "FAIL"
550 report = {
551 "status": status,
552 "timestamp": datetime.now().isoformat(),
553 "config_path": os.path.abspath(args.meta_config),
554 "total_files_processed": len(csv_files),
555 "files": files_output,
556 "summary": summary,
557 "errors": errors,
558 "warnings": warnings,
559 }
561 with open(args.output, 'wb') as f:
562 f.write(orjson.dumps(report, option=orjson.OPT_INDENT_2))
564 console.print(f"Status: {status}")
565 console.print(f"Errors: {len(errors)}, Warnings: {len(warnings)}")
566 console.print(f"Results written to: {args.output}")
568 if errors:
569 sys.exit(1)
571if __name__ == "__main__":
572 main() # pragma: no cover