Coverage for oc_meta / run / migration / rdf_to_nquads.py: 100%
30 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1#!/usr/bin/env python
3# Copyright 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8import argparse
9import multiprocessing
10import os
11import zipfile
12from concurrent.futures import ProcessPoolExecutor
13from functools import partial
14from pathlib import Path
16import py7zr
17from rdflib import Dataset
18from rich_argparse import RichHelpFormatter
20from oc_meta.lib.console import console, create_progress
21from oc_meta.lib.file_manager import collect_zip_files
24def process_zip_file(zip_path: Path, output_dir: Path, input_dir_path: Path, compress: bool) -> None:
25 graph = Dataset(default_union=True)
26 with zipfile.ZipFile(zip_path, "r") as zf:
27 json_file = next(name for name in zf.namelist() if name.endswith(".json"))
28 with zf.open(json_file) as f:
29 graph.parse(f, format="json-ld")
31 nquads_output = graph.serialize(format="nquads")
33 relative_path = zip_path.relative_to(input_dir_path)
34 output_filename = str(relative_path).replace(os.sep, "-")
35 output_filename = Path(output_filename).with_suffix(".nq").name
36 output_nq_path = output_dir / output_filename
38 with open(output_nq_path, "w", encoding="utf-8") as f:
39 f.write(nquads_output)
41 if compress:
42 output_7z_path = output_nq_path.with_suffix(".nq.7z")
43 with py7zr.SevenZipFile(output_7z_path, "w") as archive:
44 archive.write(output_nq_path, output_filename)
45 output_nq_path.unlink()
48def main() -> None: # pragma: no cover
49 parser = argparse.ArgumentParser(
50 description="Converts JSON-LD files from ZIP archives to N-Quads format.",
51 formatter_class=RichHelpFormatter
52 )
53 parser.add_argument("input_dir", type=str, help="Input directory containing ZIP files (recursive search)")
54 parser.add_argument("output_dir", type=str, help="Output directory for the converted .nq files")
55 parser.add_argument("-m", "--mode", type=str, choices=["all", "data", "prov"], default="all",
56 help="Mode: 'all' for all ZIP files (default), 'data' for entity data only, 'prov' for provenance only")
57 parser.add_argument("-w", "--workers", type=int, default=None, help="Number of worker processes (defaults to CPU count)")
58 parser.add_argument("-c", "--compress", action="store_true", help="Compress output files using 7z format")
59 args = parser.parse_args()
61 input_path = Path(args.input_dir).resolve()
62 output_path = Path(args.output_dir).resolve()
63 num_workers = args.workers if args.workers else multiprocessing.cpu_count()
65 output_path.mkdir(parents=True, exist_ok=True)
67 zip_files = collect_zip_files(
68 str(input_path),
69 only_data=args.mode == "data",
70 only_prov=args.mode == "prov",
71 )
72 total_files = len(zip_files)
74 mode_labels = {"all": "", "data": "data ", "prov": "provenance "}
75 console.print(f"Found {total_files} {mode_labels[args.mode]}ZIP files in {input_path}")
76 console.print(f"Output directory: {output_path}")
77 console.print(f"Workers: {num_workers}")
78 console.print(f"Compression: {'7z' if args.compress else 'none'}")
80 fail_count = 0
81 task_func = partial(process_zip_file, output_dir=output_path, input_dir_path=input_path, compress=args.compress)
83 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment
84 ctx = multiprocessing.get_context('forkserver')
85 with ProcessPoolExecutor(max_workers=num_workers, mp_context=ctx) as executor:
86 iterator = executor.map(task_func, zip_files)
88 with create_progress() as progress:
89 task = progress.add_task("Converting", total=total_files)
90 while True:
91 try:
92 next(iterator)
93 progress.update(task, advance=1)
94 except StopIteration:
95 break
96 except Exception as e:
97 console.print(f"[red]Error: {e}[/red]")
98 fail_count += 1
99 progress.update(task, advance=1)
101 console.print()
102 console.print("Final report")
103 console.print(f" Success: {total_files - fail_count}")
104 console.print(f" Failed: {fail_count}")
107if __name__ == "__main__": # pragma: no cover
108 main()