Coverage for oc_meta / run / migration / provenance_to_nquads.py: 20%
76 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# Copyright 2026, Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17import argparse
18import multiprocessing
19import os
20import zipfile
21from functools import partial
22from pathlib import Path
24from pebble import ProcessPool
25from rdflib import Dataset
26from rich.console import Console
27from rich.progress import (BarColumn, MofNCompleteColumn, Progress, TextColumn,
28 TimeElapsedColumn, TimeRemainingColumn)
29from rich_argparse import RichHelpFormatter
31console = Console(stderr=True)
34def convert_jsonld_to_nquads(jsonld_content: str) -> tuple[Dataset, str]:
35 graph = Dataset(default_union=True)
36 graph.parse(data=jsonld_content, format="json-ld")
37 nquads_content = graph.serialize(format="nquads")
38 return graph, nquads_content
41def process_zip_file(zip_path: Path, output_dir: Path, input_dir_path: Path) -> bool:
42 with zipfile.ZipFile(zip_path, "r") as zf:
43 json_files = [name for name in zf.namelist() if name.endswith(".json")]
44 json_filename = json_files[0]
45 jsonld_content = zf.read(json_filename).decode("utf-8")
47 input_graph, nquads_output = convert_jsonld_to_nquads(jsonld_content)
48 input_quad_count = len(input_graph)
50 relative_path = zip_path.relative_to(input_dir_path)
51 output_filename = str(relative_path).replace(os.sep, "-")
52 output_filename = Path(output_filename).with_suffix(".nq").name
53 output_nq_path = output_dir / output_filename
55 with open(output_nq_path, "w", encoding="utf-8") as f:
56 f.write(nquads_output)
58 output_graph = Dataset(default_union=True)
59 output_graph.parse(output_nq_path, format="nquads")
60 output_quad_count = len(output_graph)
62 if input_quad_count != output_quad_count:
63 console.print(f"[red]Checksum failed for {zip_path}: Input={input_quad_count}, Output={output_quad_count}[/red]")
64 return False
65 return True
68def main() -> None:
69 parser = argparse.ArgumentParser(
70 description="Converts JSON-LD files from se.zip to N-Quads.",
71 formatter_class=RichHelpFormatter
72 )
73 parser.add_argument("input_dir", type=str, help="Input directory containing se.zip files (recursive search)")
74 parser.add_argument("output_dir", type=str, help="Output directory for the converted .nq files")
75 parser.add_argument("-w", "--workers", type=int, default=None, help="Number of worker processes (defaults to CPU count)")
76 args = parser.parse_args()
78 input_path = Path(args.input_dir).resolve()
79 output_path = Path(args.output_dir).resolve()
80 num_workers = args.workers if args.workers else multiprocessing.cpu_count()
82 output_path.mkdir(parents=True, exist_ok=True)
84 zip_files = list(input_path.rglob("se.zip"))
85 total_files = len(zip_files)
87 print(f"Found {total_files} se.zip files in {input_path}")
88 print(f"Output directory: {output_path}")
89 print(f"Workers: {num_workers}")
91 success_count = 0
92 fail_count = 0
94 task_func = partial(process_zip_file, output_dir=output_path, input_dir_path=input_path)
96 with ProcessPool(max_workers=num_workers) as pool:
97 future = pool.map(task_func, zip_files)
98 iterator = future.result()
100 with Progress(
101 TextColumn("[progress.description]{task.description}"),
102 BarColumn(),
103 MofNCompleteColumn(),
104 TimeElapsedColumn(),
105 TimeRemainingColumn(),
106 console=Console(),
107 ) as progress:
108 task = progress.add_task("Converting", total=total_files)
109 while True:
110 try:
111 result = next(iterator)
112 if result:
113 success_count += 1
114 else:
115 fail_count += 1
116 except StopIteration:
117 break
118 except Exception as e:
119 console.print(f"[red]Error: {e}[/red]")
120 fail_count += 1
121 finally:
122 progress.update(task, advance=1)
124 print()
125 print("Final report")
126 print(f" Success: {success_count}")
127 print(f" Failed: {fail_count}")
130if __name__ == "__main__": # pragma: no cover
131 main()