Coverage for oc_meta / run / migration / rdf_to_nquads.py: 100%

30 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1#!/usr/bin/env python 

2 

3# Copyright 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8import argparse 

9import multiprocessing 

10import os 

11import zipfile 

12from concurrent.futures import ProcessPoolExecutor 

13from functools import partial 

14from pathlib import Path 

15 

16import py7zr 

17from rdflib import Dataset 

18from rich_argparse import RichHelpFormatter 

19 

20from oc_meta.lib.console import console, create_progress 

21from oc_meta.lib.file_manager import collect_zip_files 

22 

23 

24def process_zip_file(zip_path: Path, output_dir: Path, input_dir_path: Path, compress: bool) -> None: 

25 graph = Dataset(default_union=True) 

26 with zipfile.ZipFile(zip_path, "r") as zf: 

27 json_file = next(name for name in zf.namelist() if name.endswith(".json")) 

28 with zf.open(json_file) as f: 

29 graph.parse(f, format="json-ld") 

30 

31 nquads_output = graph.serialize(format="nquads") 

32 

33 relative_path = zip_path.relative_to(input_dir_path) 

34 output_filename = str(relative_path).replace(os.sep, "-") 

35 output_filename = Path(output_filename).with_suffix(".nq").name 

36 output_nq_path = output_dir / output_filename 

37 

38 with open(output_nq_path, "w", encoding="utf-8") as f: 

39 f.write(nquads_output) 

40 

41 if compress: 

42 output_7z_path = output_nq_path.with_suffix(".nq.7z") 

43 with py7zr.SevenZipFile(output_7z_path, "w") as archive: 

44 archive.write(output_nq_path, output_filename) 

45 output_nq_path.unlink() 

46 

47 

48def main() -> None: # pragma: no cover 

49 parser = argparse.ArgumentParser( 

50 description="Converts JSON-LD files from ZIP archives to N-Quads format.", 

51 formatter_class=RichHelpFormatter 

52 ) 

53 parser.add_argument("input_dir", type=str, help="Input directory containing ZIP files (recursive search)") 

54 parser.add_argument("output_dir", type=str, help="Output directory for the converted .nq files") 

55 parser.add_argument("-m", "--mode", type=str, choices=["all", "data", "prov"], default="all", 

56 help="Mode: 'all' for all ZIP files (default), 'data' for entity data only, 'prov' for provenance only") 

57 parser.add_argument("-w", "--workers", type=int, default=None, help="Number of worker processes (defaults to CPU count)") 

58 parser.add_argument("-c", "--compress", action="store_true", help="Compress output files using 7z format") 

59 args = parser.parse_args() 

60 

61 input_path = Path(args.input_dir).resolve() 

62 output_path = Path(args.output_dir).resolve() 

63 num_workers = args.workers if args.workers else multiprocessing.cpu_count() 

64 

65 output_path.mkdir(parents=True, exist_ok=True) 

66 

67 zip_files = collect_zip_files( 

68 str(input_path), 

69 only_data=args.mode == "data", 

70 only_prov=args.mode == "prov", 

71 ) 

72 total_files = len(zip_files) 

73 

74 mode_labels = {"all": "", "data": "data ", "prov": "provenance "} 

75 console.print(f"Found {total_files} {mode_labels[args.mode]}ZIP files in {input_path}") 

76 console.print(f"Output directory: {output_path}") 

77 console.print(f"Workers: {num_workers}") 

78 console.print(f"Compression: {'7z' if args.compress else 'none'}") 

79 

80 fail_count = 0 

81 task_func = partial(process_zip_file, output_dir=output_path, input_dir_path=input_path, compress=args.compress) 

82 

83 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment 

84 ctx = multiprocessing.get_context('forkserver') 

85 with ProcessPoolExecutor(max_workers=num_workers, mp_context=ctx) as executor: 

86 iterator = executor.map(task_func, zip_files) 

87 

88 with create_progress() as progress: 

89 task = progress.add_task("Converting", total=total_files) 

90 while True: 

91 try: 

92 next(iterator) 

93 progress.update(task, advance=1) 

94 except StopIteration: 

95 break 

96 except Exception as e: 

97 console.print(f"[red]Error: {e}[/red]") 

98 fail_count += 1 

99 progress.update(task, advance=1) 

100 

101 console.print() 

102 console.print("Final report") 

103 console.print(f" Success: {total_files - fail_count}") 

104 console.print(f" Failed: {fail_count}") 

105 

106 

107if __name__ == "__main__": # pragma: no cover 

108 main()