Coverage for oc_meta / run / migration / stream_nquads.py: 100%
19 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1#!/usr/bin/env python
3# Copyright 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8import argparse
9import multiprocessing
10import sys
11import zipfile
12from pathlib import Path
14from rdflib import Dataset
15from rich_argparse import RichHelpFormatter
17from oc_meta.lib.file_manager import collect_zip_files
20def convert_zip_to_nquads(zip_path: str) -> bytes:
21 try:
22 with zipfile.ZipFile(zip_path, "r") as zf:
23 json_file = next(n for n in zf.namelist() if n.endswith(".json"))
24 graph = Dataset(default_union=True)
25 with zf.open(json_file) as f:
26 graph.parse(f, format="json-ld")
27 return graph.serialize(format="nquads").encode("utf-8")
28 except Exception:
29 print(f"Failed to convert: {zip_path}", file=sys.stderr, flush=True)
30 raise
33def main() -> None: # pragma: no cover
34 parser = argparse.ArgumentParser(
35 description="Streams N-Quads from JSON-LD ZIP archives to stdout.",
36 formatter_class=RichHelpFormatter,
37 )
38 parser.add_argument("rdf_dir", type=str, help="Root directory containing RDF ZIP archives")
39 parser.add_argument("-m", "--mode", type=str, choices=["all", "data", "prov"], default="all",
40 help="Mode: 'all' for all ZIP files (default), 'data' for entity data only, 'prov' for provenance only")
41 parser.add_argument("-w", "--workers", type=int, default=None, help="Number of worker processes (defaults to min(8, CPU count))")
42 args = parser.parse_args()
44 rdf_path = Path(args.rdf_dir).resolve()
45 num_workers = args.workers if args.workers else min(8, multiprocessing.cpu_count())
47 zip_files = collect_zip_files(
48 str(rdf_path),
49 only_data=args.mode == "data",
50 only_prov=args.mode == "prov",
51 )
53 stdout = sys.stdout.buffer
54 ctx = multiprocessing.get_context("forkserver")
55 with ctx.Pool(processes=num_workers) as pool:
56 for result in pool.imap_unordered(convert_zip_to_nquads, zip_files, chunksize=10):
57 stdout.write(result)
58 stdout.flush()
61if __name__ == "__main__": # pragma: no cover
62 main()