Coverage for oc_meta / run / migration / stream_nquads.py: 100%

19 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1#!/usr/bin/env python 

2 

3# Copyright 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8import argparse 

9import multiprocessing 

10import sys 

11import zipfile 

12from pathlib import Path 

13 

14from rdflib import Dataset 

15from rich_argparse import RichHelpFormatter 

16 

17from oc_meta.lib.file_manager import collect_zip_files 

18 

19 

20def convert_zip_to_nquads(zip_path: str) -> bytes: 

21 try: 

22 with zipfile.ZipFile(zip_path, "r") as zf: 

23 json_file = next(n for n in zf.namelist() if n.endswith(".json")) 

24 graph = Dataset(default_union=True) 

25 with zf.open(json_file) as f: 

26 graph.parse(f, format="json-ld") 

27 return graph.serialize(format="nquads").encode("utf-8") 

28 except Exception: 

29 print(f"Failed to convert: {zip_path}", file=sys.stderr, flush=True) 

30 raise 

31 

32 

33def main() -> None: # pragma: no cover 

34 parser = argparse.ArgumentParser( 

35 description="Streams N-Quads from JSON-LD ZIP archives to stdout.", 

36 formatter_class=RichHelpFormatter, 

37 ) 

38 parser.add_argument("rdf_dir", type=str, help="Root directory containing RDF ZIP archives") 

39 parser.add_argument("-m", "--mode", type=str, choices=["all", "data", "prov"], default="all", 

40 help="Mode: 'all' for all ZIP files (default), 'data' for entity data only, 'prov' for provenance only") 

41 parser.add_argument("-w", "--workers", type=int, default=None, help="Number of worker processes (defaults to min(8, CPU count))") 

42 args = parser.parse_args() 

43 

44 rdf_path = Path(args.rdf_dir).resolve() 

45 num_workers = args.workers if args.workers else min(8, multiprocessing.cpu_count()) 

46 

47 zip_files = collect_zip_files( 

48 str(rdf_path), 

49 only_data=args.mode == "data", 

50 only_prov=args.mode == "prov", 

51 ) 

52 

53 stdout = sys.stdout.buffer 

54 ctx = multiprocessing.get_context("forkserver") 

55 with ctx.Pool(processes=num_workers) as pool: 

56 for result in pool.imap_unordered(convert_zip_to_nquads, zip_files, chunksize=10): 

57 stdout.write(result) 

58 stdout.flush() 

59 

60 

61if __name__ == "__main__": # pragma: no cover 

62 main()