Coverage for oc_meta / run / count / triples.py: 98%
163 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5"""Count RDF triples or quads in compressed or uncompressed files."""
7from __future__ import annotations
9import argparse
10import fnmatch
11import gzip
12import multiprocessing
13import os
14import zipfile
15from collections.abc import Iterable, Mapping, Sequence
16from functools import partial
17from pathlib import Path
18from typing import TextIO
20import orjson
21from rich_argparse import RichHelpFormatter
23from oc_meta.lib.console import create_progress
24from oc_meta.lib.file_manager import collect_files
26QUAD_FORMATS = {"nquads"}
27LINE_BASED_FORMATS = {"nquads", "nt"}
28JSONLD_SPECIAL_KEYS = {"@id", "@context", "@graph", "@list", "@set", "@language", "@value"}
31def _count_jsonld_value(value: object) -> int:
32 """Count triples generated by a JSON-LD property value."""
33 if isinstance(value, list):
34 return sum(_count_jsonld_value(item) for item in value)
35 if isinstance(value, dict):
36 if "@list" in value:
37 return _count_jsonld_value(value["@list"])
38 if "@set" in value:
39 return _count_jsonld_value(value["@set"])
40 if "@value" in value:
41 return 1
42 if "@id" in value and len(value) == 1:
43 return 1
44 return 1 + _count_jsonld_object(value)
45 return 1
48def _count_jsonld_object(obj: Mapping[str, object]) -> int:
49 """Count triples for a single JSON-LD object."""
50 count = 0
51 for key, value in obj.items():
52 if key in JSONLD_SPECIAL_KEYS:
53 continue
54 if key == "@type":
55 if isinstance(value, list):
56 count += len(value)
57 else:
58 count += 1
59 else:
60 count += _count_jsonld_value(value)
61 return count
64def _count_jsonld_triples(data: Mapping[str, object] | Sequence[object]) -> int:
65 """Count triples in a JSON-LD document without RDFLib parsing."""
66 if isinstance(data, Sequence) and not isinstance(data, (str, bytes)):
67 total = 0
68 for obj in data:
69 if isinstance(obj, dict):
70 if "@graph" in obj:
71 graph = obj["@graph"]
72 if isinstance(graph, list):
73 total += sum(
74 _count_jsonld_object(item)
75 for item in graph
76 if isinstance(item, dict)
77 )
78 else:
79 total += _count_jsonld_object(obj)
80 return total
81 if isinstance(data, Mapping):
82 if "@graph" in data:
83 graph = data["@graph"]
84 if isinstance(graph, list):
85 return sum(
86 _count_jsonld_object(obj) for obj in graph if isinstance(obj, dict)
87 )
88 return _count_jsonld_object(data)
89 return 0
92def parse_args() -> argparse.Namespace: # pragma: no cover
93 parser = argparse.ArgumentParser(
94 description="Count RDF triples or quads in compressed or uncompressed files.",
95 formatter_class=RichHelpFormatter,
96 )
97 parser.add_argument(
98 "directory",
99 type=Path,
100 help="Directory containing the RDF files.",
101 )
102 parser.add_argument(
103 "--pattern",
104 default="*.nq.gz",
105 help="Glob pattern for locating files (default: '*.nq.gz').",
106 )
107 parser.add_argument(
108 "--format",
109 default="nquads",
110 choices=["nquads", "nt", "json-ld"],
111 help="RDF format of the input files (default: nquads).",
112 )
113 parser.add_argument(
114 "--recursive",
115 action="store_true",
116 help="Search recursively under the provided directory.",
117 )
118 parser.add_argument(
119 "--prov-only",
120 action="store_true",
121 help="Count only files in 'prov' subdirectories.",
122 )
123 parser.add_argument(
124 "--data-only",
125 action="store_true",
126 help="Count only files not in 'prov' subdirectories.",
127 )
128 parser.add_argument(
129 "--workers",
130 type=int,
131 default=None,
132 help="Number of parallel workers (default: CPU count).",
133 )
134 parser.add_argument(
135 "--show-per-file",
136 action="store_true",
137 help="Print the count for each processed file.",
138 )
139 parser.add_argument(
140 "--keep-going",
141 action="store_true",
142 help="Continue processing even if errors occur.",
143 )
144 return parser.parse_args()
147def discover_files(
148 directory: Path,
149 pattern: str,
150 recursive: bool,
151 prov_only: bool,
152 data_only: bool,
153) -> list[Path]:
154 path = directory.expanduser().resolve()
155 if not path.is_dir():
156 raise ValueError(f"'{path}' does not exist or is not a directory.")
158 root_str = str(path)
160 if recursive:
162 def path_filter(p: str) -> bool:
163 is_prov = "/prov" in p or p.endswith("/prov")
164 if prov_only and not is_prov:
165 return False
166 if data_only and is_prov:
167 return False
168 return True
170 str_files = collect_files(root_str, pattern, path_filter)
171 return sorted(Path(f) for f in str_files)
173 is_prov = path.name == "prov"
174 if (prov_only and not is_prov) or (data_only and is_prov):
175 return []
177 files: list[Path] = []
178 for entry in os.scandir(root_str):
179 if entry.is_file() and fnmatch.fnmatch(entry.name, pattern):
180 files.append(Path(entry.path))
181 return sorted(files)
184def _count_lines_binary(file_obj: Iterable[bytes]) -> int:
185 count = 0
186 for line_num, line in enumerate(file_obj, 1):
187 stripped = line.strip()
188 if not stripped or stripped.startswith(b"#"):
189 continue
190 if not stripped.endswith(b"."):
191 raise ValueError(f"line {line_num}: statement does not end with '.'")
192 count += 1
193 return count
196def _count_lines_text(file_obj: TextIO) -> int:
197 count = 0
198 for line_num, line in enumerate(file_obj, 1):
199 stripped = line.strip()
200 if not stripped or stripped.startswith("#"):
201 continue
202 if not stripped.endswith("."):
203 raise ValueError(f"line {line_num}: statement does not end with '.'")
204 count += 1
205 return count
208def count_in_file(
209 file_path: Path, rdf_format: str
210) -> tuple[str, int, str | None]:
211 try:
212 suffix = file_path.suffix.lower()
213 use_line_count = rdf_format in LINE_BASED_FORMATS
215 if suffix == ".zip":
216 with zipfile.ZipFile(file_path, "r") as z:
217 inner_name = z.namelist()[0]
218 with z.open(inner_name) as f:
219 if use_line_count:
220 return str(file_path), _count_lines_binary(f), None
221 content = f.read().decode("utf-8")
222 data = orjson.loads(content)
223 return str(file_path), _count_jsonld_triples(data), None
224 elif suffix == ".gz":
225 if use_line_count:
226 with gzip.open(file_path, "rb") as f:
227 return str(file_path), _count_lines_binary(f), None
228 with gzip.open(file_path, "rb") as f:
229 data = orjson.loads(f.read())
230 return str(file_path), _count_jsonld_triples(data), None
231 else:
232 if use_line_count:
233 with open(file_path, "r", encoding="utf-8") as f:
234 return str(file_path), _count_lines_text(f), None
235 with open(file_path, "rb") as f:
236 data = orjson.loads(f.read())
237 return str(file_path), _count_jsonld_triples(data), None
238 except Exception as exc:
239 return str(file_path), 0, str(exc)
242def process_files(
243 files: list[Path],
244 rdf_format: str,
245 max_workers: int | None,
246 show_per_file: bool,
247 keep_going: bool,
248 unit_name: str,
249) -> tuple[int, list[tuple[str, str]]]:
250 workers = max_workers or multiprocessing.cpu_count()
251 if workers < 1:
252 workers = 1
254 total_count = 0
255 results: list[tuple[str, int]] = []
256 failures: list[tuple[str, str]] = []
257 chunksize = max(1, len(files) // (workers * 4))
259 worker_fn = partial(count_in_file, rdf_format=rdf_format)
261 with create_progress() as progress:
262 task = progress.add_task(f"Counting {unit_name}", total=len(files))
264 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment
265 ctx = multiprocessing.get_context('forkserver')
266 with ctx.Pool(processes=workers) as pool:
267 for file_path, count, error in pool.imap_unordered(
268 worker_fn, files, chunksize=chunksize
269 ):
270 if error:
271 failures.append((file_path, error))
272 if not keep_going:
273 progress.console.print(
274 f"[red]Error processing {file_path}: {error}[/red]"
275 )
276 progress.advance(task)
277 pool.terminate()
278 break
279 progress.console.print(
280 f"[yellow]Error processing {file_path}: {error} (continuing)[/yellow]"
281 )
282 else:
283 total_count += count
284 if show_per_file:
285 results.append((file_path, count))
287 progress.advance(task)
289 if show_per_file and results:
290 width = max(len(path) for path, _ in results)
291 for path, count in sorted(results):
292 print(f"{path.ljust(width)} : {count}")
294 return total_count, failures
297def main() -> None: # pragma: no cover
298 args = parse_args()
300 if args.prov_only and args.data_only:
301 print("Error: --prov-only and --data-only are mutually exclusive.")
302 return
304 try:
305 files = discover_files(
306 args.directory,
307 args.pattern,
308 args.recursive,
309 args.prov_only,
310 args.data_only,
311 )
312 except ValueError as exc:
313 print(f"Error: {exc}")
314 return
316 if not files:
317 print("No files found matching the provided pattern.")
318 return
320 unit_name = "quads" if args.format in QUAD_FORMATS else "triples"
322 total, failures = process_files(
323 files,
324 args.format,
325 args.workers,
326 args.show_per_file,
327 args.keep_going,
328 unit_name,
329 )
331 print(f"Total {unit_name}: {total}")
333 if failures:
334 print(f"\nFiles with errors ({len(failures)}):")
335 for path, error in failures:
336 print(f" {path}: {error}")
339if __name__ == "__main__": # pragma: no cover
340 main()