Coverage for oc_meta / run / count / triples.py: 98%

163 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5"""Count RDF triples or quads in compressed or uncompressed files.""" 

6 

7from __future__ import annotations 

8 

9import argparse 

10import fnmatch 

11import gzip 

12import multiprocessing 

13import os 

14import zipfile 

15from collections.abc import Iterable, Mapping, Sequence 

16from functools import partial 

17from pathlib import Path 

18from typing import TextIO 

19 

20import orjson 

21from rich_argparse import RichHelpFormatter 

22 

23from oc_meta.lib.console import create_progress 

24from oc_meta.lib.file_manager import collect_files 

25 

26QUAD_FORMATS = {"nquads"} 

27LINE_BASED_FORMATS = {"nquads", "nt"} 

28JSONLD_SPECIAL_KEYS = {"@id", "@context", "@graph", "@list", "@set", "@language", "@value"} 

29 

30 

31def _count_jsonld_value(value: object) -> int: 

32 """Count triples generated by a JSON-LD property value.""" 

33 if isinstance(value, list): 

34 return sum(_count_jsonld_value(item) for item in value) 

35 if isinstance(value, dict): 

36 if "@list" in value: 

37 return _count_jsonld_value(value["@list"]) 

38 if "@set" in value: 

39 return _count_jsonld_value(value["@set"]) 

40 if "@value" in value: 

41 return 1 

42 if "@id" in value and len(value) == 1: 

43 return 1 

44 return 1 + _count_jsonld_object(value) 

45 return 1 

46 

47 

48def _count_jsonld_object(obj: Mapping[str, object]) -> int: 

49 """Count triples for a single JSON-LD object.""" 

50 count = 0 

51 for key, value in obj.items(): 

52 if key in JSONLD_SPECIAL_KEYS: 

53 continue 

54 if key == "@type": 

55 if isinstance(value, list): 

56 count += len(value) 

57 else: 

58 count += 1 

59 else: 

60 count += _count_jsonld_value(value) 

61 return count 

62 

63 

64def _count_jsonld_triples(data: Mapping[str, object] | Sequence[object]) -> int: 

65 """Count triples in a JSON-LD document without RDFLib parsing.""" 

66 if isinstance(data, Sequence) and not isinstance(data, (str, bytes)): 

67 total = 0 

68 for obj in data: 

69 if isinstance(obj, dict): 

70 if "@graph" in obj: 

71 graph = obj["@graph"] 

72 if isinstance(graph, list): 

73 total += sum( 

74 _count_jsonld_object(item) 

75 for item in graph 

76 if isinstance(item, dict) 

77 ) 

78 else: 

79 total += _count_jsonld_object(obj) 

80 return total 

81 if isinstance(data, Mapping): 

82 if "@graph" in data: 

83 graph = data["@graph"] 

84 if isinstance(graph, list): 

85 return sum( 

86 _count_jsonld_object(obj) for obj in graph if isinstance(obj, dict) 

87 ) 

88 return _count_jsonld_object(data) 

89 return 0 

90 

91 

92def parse_args() -> argparse.Namespace: # pragma: no cover 

93 parser = argparse.ArgumentParser( 

94 description="Count RDF triples or quads in compressed or uncompressed files.", 

95 formatter_class=RichHelpFormatter, 

96 ) 

97 parser.add_argument( 

98 "directory", 

99 type=Path, 

100 help="Directory containing the RDF files.", 

101 ) 

102 parser.add_argument( 

103 "--pattern", 

104 default="*.nq.gz", 

105 help="Glob pattern for locating files (default: '*.nq.gz').", 

106 ) 

107 parser.add_argument( 

108 "--format", 

109 default="nquads", 

110 choices=["nquads", "nt", "json-ld"], 

111 help="RDF format of the input files (default: nquads).", 

112 ) 

113 parser.add_argument( 

114 "--recursive", 

115 action="store_true", 

116 help="Search recursively under the provided directory.", 

117 ) 

118 parser.add_argument( 

119 "--prov-only", 

120 action="store_true", 

121 help="Count only files in 'prov' subdirectories.", 

122 ) 

123 parser.add_argument( 

124 "--data-only", 

125 action="store_true", 

126 help="Count only files not in 'prov' subdirectories.", 

127 ) 

128 parser.add_argument( 

129 "--workers", 

130 type=int, 

131 default=None, 

132 help="Number of parallel workers (default: CPU count).", 

133 ) 

134 parser.add_argument( 

135 "--show-per-file", 

136 action="store_true", 

137 help="Print the count for each processed file.", 

138 ) 

139 parser.add_argument( 

140 "--keep-going", 

141 action="store_true", 

142 help="Continue processing even if errors occur.", 

143 ) 

144 return parser.parse_args() 

145 

146 

147def discover_files( 

148 directory: Path, 

149 pattern: str, 

150 recursive: bool, 

151 prov_only: bool, 

152 data_only: bool, 

153) -> list[Path]: 

154 path = directory.expanduser().resolve() 

155 if not path.is_dir(): 

156 raise ValueError(f"'{path}' does not exist or is not a directory.") 

157 

158 root_str = str(path) 

159 

160 if recursive: 

161 

162 def path_filter(p: str) -> bool: 

163 is_prov = "/prov" in p or p.endswith("/prov") 

164 if prov_only and not is_prov: 

165 return False 

166 if data_only and is_prov: 

167 return False 

168 return True 

169 

170 str_files = collect_files(root_str, pattern, path_filter) 

171 return sorted(Path(f) for f in str_files) 

172 

173 is_prov = path.name == "prov" 

174 if (prov_only and not is_prov) or (data_only and is_prov): 

175 return [] 

176 

177 files: list[Path] = [] 

178 for entry in os.scandir(root_str): 

179 if entry.is_file() and fnmatch.fnmatch(entry.name, pattern): 

180 files.append(Path(entry.path)) 

181 return sorted(files) 

182 

183 

184def _count_lines_binary(file_obj: Iterable[bytes]) -> int: 

185 count = 0 

186 for line_num, line in enumerate(file_obj, 1): 

187 stripped = line.strip() 

188 if not stripped or stripped.startswith(b"#"): 

189 continue 

190 if not stripped.endswith(b"."): 

191 raise ValueError(f"line {line_num}: statement does not end with '.'") 

192 count += 1 

193 return count 

194 

195 

196def _count_lines_text(file_obj: TextIO) -> int: 

197 count = 0 

198 for line_num, line in enumerate(file_obj, 1): 

199 stripped = line.strip() 

200 if not stripped or stripped.startswith("#"): 

201 continue 

202 if not stripped.endswith("."): 

203 raise ValueError(f"line {line_num}: statement does not end with '.'") 

204 count += 1 

205 return count 

206 

207 

208def count_in_file( 

209 file_path: Path, rdf_format: str 

210) -> tuple[str, int, str | None]: 

211 try: 

212 suffix = file_path.suffix.lower() 

213 use_line_count = rdf_format in LINE_BASED_FORMATS 

214 

215 if suffix == ".zip": 

216 with zipfile.ZipFile(file_path, "r") as z: 

217 inner_name = z.namelist()[0] 

218 with z.open(inner_name) as f: 

219 if use_line_count: 

220 return str(file_path), _count_lines_binary(f), None 

221 content = f.read().decode("utf-8") 

222 data = orjson.loads(content) 

223 return str(file_path), _count_jsonld_triples(data), None 

224 elif suffix == ".gz": 

225 if use_line_count: 

226 with gzip.open(file_path, "rb") as f: 

227 return str(file_path), _count_lines_binary(f), None 

228 with gzip.open(file_path, "rb") as f: 

229 data = orjson.loads(f.read()) 

230 return str(file_path), _count_jsonld_triples(data), None 

231 else: 

232 if use_line_count: 

233 with open(file_path, "r", encoding="utf-8") as f: 

234 return str(file_path), _count_lines_text(f), None 

235 with open(file_path, "rb") as f: 

236 data = orjson.loads(f.read()) 

237 return str(file_path), _count_jsonld_triples(data), None 

238 except Exception as exc: 

239 return str(file_path), 0, str(exc) 

240 

241 

242def process_files( 

243 files: list[Path], 

244 rdf_format: str, 

245 max_workers: int | None, 

246 show_per_file: bool, 

247 keep_going: bool, 

248 unit_name: str, 

249) -> tuple[int, list[tuple[str, str]]]: 

250 workers = max_workers or multiprocessing.cpu_count() 

251 if workers < 1: 

252 workers = 1 

253 

254 total_count = 0 

255 results: list[tuple[str, int]] = [] 

256 failures: list[tuple[str, str]] = [] 

257 chunksize = max(1, len(files) // (workers * 4)) 

258 

259 worker_fn = partial(count_in_file, rdf_format=rdf_format) 

260 

261 with create_progress() as progress: 

262 task = progress.add_task(f"Counting {unit_name}", total=len(files)) 

263 

264 # Use forkserver to avoid deadlocks when forking in a multi-threaded environment 

265 ctx = multiprocessing.get_context('forkserver') 

266 with ctx.Pool(processes=workers) as pool: 

267 for file_path, count, error in pool.imap_unordered( 

268 worker_fn, files, chunksize=chunksize 

269 ): 

270 if error: 

271 failures.append((file_path, error)) 

272 if not keep_going: 

273 progress.console.print( 

274 f"[red]Error processing {file_path}: {error}[/red]" 

275 ) 

276 progress.advance(task) 

277 pool.terminate() 

278 break 

279 progress.console.print( 

280 f"[yellow]Error processing {file_path}: {error} (continuing)[/yellow]" 

281 ) 

282 else: 

283 total_count += count 

284 if show_per_file: 

285 results.append((file_path, count)) 

286 

287 progress.advance(task) 

288 

289 if show_per_file and results: 

290 width = max(len(path) for path, _ in results) 

291 for path, count in sorted(results): 

292 print(f"{path.ljust(width)} : {count}") 

293 

294 return total_count, failures 

295 

296 

297def main() -> None: # pragma: no cover 

298 args = parse_args() 

299 

300 if args.prov_only and args.data_only: 

301 print("Error: --prov-only and --data-only are mutually exclusive.") 

302 return 

303 

304 try: 

305 files = discover_files( 

306 args.directory, 

307 args.pattern, 

308 args.recursive, 

309 args.prov_only, 

310 args.data_only, 

311 ) 

312 except ValueError as exc: 

313 print(f"Error: {exc}") 

314 return 

315 

316 if not files: 

317 print("No files found matching the provided pattern.") 

318 return 

319 

320 unit_name = "quads" if args.format in QUAD_FORMATS else "triples" 

321 

322 total, failures = process_files( 

323 files, 

324 args.format, 

325 args.workers, 

326 args.show_per_file, 

327 args.keep_going, 

328 unit_name, 

329 ) 

330 

331 print(f"Total {unit_name}: {total}") 

332 

333 if failures: 

334 print(f"\nFiles with errors ({len(failures)}):") 

335 for path, error in failures: 

336 print(f" {path}: {error}") 

337 

338 

339if __name__ == "__main__": # pragma: no cover 

340 main()