Coverage for oc_meta / run / count / triples.py: 18%
76 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 17:25 +0000
1"""Count RDF triples or quads in compressed or uncompressed files."""
3from __future__ import annotations
5import argparse
6import gzip
7import multiprocessing
8import zipfile
9from concurrent.futures import ProcessPoolExecutor, as_completed
10from pathlib import Path
12from rdflib import Dataset
13from rich.progress import (BarColumn, MofNCompleteColumn, Progress,
14 SpinnerColumn, TaskProgressColumn, TextColumn,
15 TimeElapsedColumn, TimeRemainingColumn)
16from rich_argparse import RichHelpFormatter
18QUAD_FORMATS = {"nquads", "trig"}
21def parse_args() -> argparse.Namespace: # pragma: no cover
22 parser = argparse.ArgumentParser(
23 description="Count RDF triples or quads in compressed or uncompressed files.",
24 formatter_class=RichHelpFormatter,
25 )
26 parser.add_argument(
27 "directory",
28 type=Path,
29 help="Directory containing the RDF files.",
30 )
31 parser.add_argument(
32 "--pattern",
33 default="*.nq.gz",
34 help="Glob pattern for locating files (default: '*.nq.gz').",
35 )
36 parser.add_argument(
37 "--format",
38 default="nquads",
39 choices=["nquads", "json-ld", "turtle", "trig"],
40 help="RDF format of the input files (default: nquads).",
41 )
42 parser.add_argument(
43 "--recursive",
44 action="store_true",
45 help="Search recursively under the provided directory.",
46 )
47 parser.add_argument(
48 "--prov-only",
49 action="store_true",
50 help="Count only files in 'prov' subdirectories.",
51 )
52 parser.add_argument(
53 "--data-only",
54 action="store_true",
55 help="Count only files not in 'prov' subdirectories.",
56 )
57 parser.add_argument(
58 "--workers",
59 type=int,
60 default=None,
61 help="Number of parallel workers (default: CPU count).",
62 )
63 parser.add_argument(
64 "--show-per-file",
65 action="store_true",
66 help="Print the count for each processed file.",
67 )
68 parser.add_argument(
69 "--keep-going",
70 action="store_true",
71 help="Continue processing even if errors occur.",
72 )
73 return parser.parse_args()
76def discover_files(
77 directory: Path,
78 pattern: str,
79 recursive: bool,
80 prov_only: bool,
81 data_only: bool,
82) -> list[Path]:
83 path = directory.expanduser().resolve()
84 if not path.is_dir():
85 raise ValueError(f"'{path}' does not exist or is not a directory.")
87 globber = path.rglob if recursive else path.glob
88 files: list[Path] = []
90 for file_path in globber(pattern):
91 if not file_path.is_file():
92 continue
93 is_prov = "prov" in file_path.parts
94 if prov_only and not is_prov:
95 continue
96 if data_only and is_prov:
97 continue
98 files.append(file_path)
100 return sorted(files)
103def count_in_file(file_path: Path, rdf_format: str) -> tuple[str, int, str | None]:
104 try:
105 suffix = file_path.suffix.lower()
106 if suffix == ".zip":
107 with zipfile.ZipFile(file_path, "r") as z:
108 inner_name = z.namelist()[0]
109 with z.open(inner_name) as f:
110 content = f.read().decode("utf-8")
111 dataset: Dataset = Dataset(default_union=True)
112 dataset.parse(data=content, format=rdf_format)
113 elif suffix == ".gz":
114 dataset = Dataset(default_union=True)
115 with gzip.open(file_path, "rt", encoding="utf-8") as f:
116 dataset.parse(f, format=rdf_format)
117 else:
118 dataset = Dataset(default_union=True)
119 with open(file_path, "r", encoding="utf-8") as f:
120 dataset.parse(f, format=rdf_format)
121 return str(file_path), len(dataset), None
122 except Exception as exc:
123 return str(file_path), 0, str(exc)
126def process_files(
127 files: list[Path],
128 rdf_format: str,
129 max_workers: int | None,
130 show_per_file: bool,
131 keep_going: bool,
132 unit_name: str,
133) -> tuple[int, list[tuple[str, str]]]:
134 workers = max_workers or multiprocessing.cpu_count()
135 if workers < 1:
136 workers = 1
138 total_count = 0
139 results: list[tuple[str, int]] = []
140 failures: list[tuple[str, str]] = []
142 with Progress(
143 SpinnerColumn(),
144 TextColumn("[progress.description]{task.description}"),
145 BarColumn(),
146 TaskProgressColumn(),
147 MofNCompleteColumn(),
148 TimeElapsedColumn(),
149 TimeRemainingColumn(),
150 ) as progress:
151 task = progress.add_task(f"Counting {unit_name}", total=len(files))
153 with ProcessPoolExecutor(max_workers=workers) as executor:
154 futures = {
155 executor.submit(count_in_file, fp, rdf_format): fp for fp in files
156 }
158 for future in as_completed(futures):
159 file_path, count, error = future.result()
161 if error:
162 failures.append((file_path, error))
163 if not keep_going:
164 progress.console.print(
165 f"[red]Error processing {file_path}: {error}[/red]"
166 )
167 progress.advance(task)
168 break
169 progress.console.print(
170 f"[yellow]Error processing {file_path}: {error} (continuing)[/yellow]"
171 )
172 else:
173 total_count += count
174 if show_per_file:
175 results.append((file_path, count))
177 progress.advance(task)
179 if show_per_file and results:
180 width = max(len(path) for path, _ in results)
181 for path, count in sorted(results):
182 print(f"{path.ljust(width)} : {count}")
184 return total_count, failures
187def main() -> None: # pragma: no cover
188 args = parse_args()
190 if args.prov_only and args.data_only:
191 print("Error: --prov-only and --data-only are mutually exclusive.")
192 return
194 try:
195 files = discover_files(
196 args.directory,
197 args.pattern,
198 args.recursive,
199 args.prov_only,
200 args.data_only,
201 )
202 except ValueError as exc:
203 print(f"Error: {exc}")
204 return
206 if not files:
207 print("No files found matching the provided pattern.")
208 return
210 unit_name = "quads" if args.format in QUAD_FORMATS else "triples"
212 total, failures = process_files(
213 files,
214 args.format,
215 args.workers,
216 args.show_per_file,
217 args.keep_going,
218 unit_name,
219 )
221 print(f"Total {unit_name}: {total}")
223 if failures:
224 print(f"\nFiles with errors ({len(failures)}):")
225 for path, error in failures:
226 print(f" {path}: {error}")
229if __name__ == "__main__": # pragma: no cover
230 main()