Coverage for oc_meta / run / migration / provenance_to_nquads.py: 20%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# Copyright 2026, Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17import argparse 

18import multiprocessing 

19import os 

20import zipfile 

21from functools import partial 

22from pathlib import Path 

23 

24from pebble import ProcessPool 

25from rdflib import Dataset 

26from rich.console import Console 

27from rich.progress import (BarColumn, MofNCompleteColumn, Progress, TextColumn, 

28 TimeElapsedColumn, TimeRemainingColumn) 

29from rich_argparse import RichHelpFormatter 

30 

31console = Console(stderr=True) 

32 

33 

34def convert_jsonld_to_nquads(jsonld_content: str) -> tuple[Dataset, str]: 

35 graph = Dataset(default_union=True) 

36 graph.parse(data=jsonld_content, format="json-ld") 

37 nquads_content = graph.serialize(format="nquads") 

38 return graph, nquads_content 

39 

40 

41def process_zip_file(zip_path: Path, output_dir: Path, input_dir_path: Path) -> bool: 

42 with zipfile.ZipFile(zip_path, "r") as zf: 

43 json_files = [name for name in zf.namelist() if name.endswith(".json")] 

44 json_filename = json_files[0] 

45 jsonld_content = zf.read(json_filename).decode("utf-8") 

46 

47 input_graph, nquads_output = convert_jsonld_to_nquads(jsonld_content) 

48 input_quad_count = len(input_graph) 

49 

50 relative_path = zip_path.relative_to(input_dir_path) 

51 output_filename = str(relative_path).replace(os.sep, "-") 

52 output_filename = Path(output_filename).with_suffix(".nq").name 

53 output_nq_path = output_dir / output_filename 

54 

55 with open(output_nq_path, "w", encoding="utf-8") as f: 

56 f.write(nquads_output) 

57 

58 output_graph = Dataset(default_union=True) 

59 output_graph.parse(output_nq_path, format="nquads") 

60 output_quad_count = len(output_graph) 

61 

62 if input_quad_count != output_quad_count: 

63 console.print(f"[red]Checksum failed for {zip_path}: Input={input_quad_count}, Output={output_quad_count}[/red]") 

64 return False 

65 return True 

66 

67 

68def main() -> None: 

69 parser = argparse.ArgumentParser( 

70 description="Converts JSON-LD files from se.zip to N-Quads.", 

71 formatter_class=RichHelpFormatter 

72 ) 

73 parser.add_argument("input_dir", type=str, help="Input directory containing se.zip files (recursive search)") 

74 parser.add_argument("output_dir", type=str, help="Output directory for the converted .nq files") 

75 parser.add_argument("-w", "--workers", type=int, default=None, help="Number of worker processes (defaults to CPU count)") 

76 args = parser.parse_args() 

77 

78 input_path = Path(args.input_dir).resolve() 

79 output_path = Path(args.output_dir).resolve() 

80 num_workers = args.workers if args.workers else multiprocessing.cpu_count() 

81 

82 output_path.mkdir(parents=True, exist_ok=True) 

83 

84 zip_files = list(input_path.rglob("se.zip")) 

85 total_files = len(zip_files) 

86 

87 print(f"Found {total_files} se.zip files in {input_path}") 

88 print(f"Output directory: {output_path}") 

89 print(f"Workers: {num_workers}") 

90 

91 success_count = 0 

92 fail_count = 0 

93 

94 task_func = partial(process_zip_file, output_dir=output_path, input_dir_path=input_path) 

95 

96 with ProcessPool(max_workers=num_workers) as pool: 

97 future = pool.map(task_func, zip_files) 

98 iterator = future.result() 

99 

100 with Progress( 

101 TextColumn("[progress.description]{task.description}"), 

102 BarColumn(), 

103 MofNCompleteColumn(), 

104 TimeElapsedColumn(), 

105 TimeRemainingColumn(), 

106 console=Console(), 

107 ) as progress: 

108 task = progress.add_task("Converting", total=total_files) 

109 while True: 

110 try: 

111 result = next(iterator) 

112 if result: 

113 success_count += 1 

114 else: 

115 fail_count += 1 

116 except StopIteration: 

117 break 

118 except Exception as e: 

119 console.print(f"[red]Error: {e}[/red]") 

120 fail_count += 1 

121 finally: 

122 progress.update(task, advance=1) 

123 

124 print() 

125 print("Final report") 

126 print(f" Success: {success_count}") 

127 print(f" Failed: {fail_count}") 

128 

129 

130if __name__ == "__main__": # pragma: no cover 

131 main()