Coverage for oc_meta / run / count / triples.py: 18%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 17:25 +0000

1"""Count RDF triples or quads in compressed or uncompressed files.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import gzip 

7import multiprocessing 

8import zipfile 

9from concurrent.futures import ProcessPoolExecutor, as_completed 

10from pathlib import Path 

11 

12from rdflib import Dataset 

13from rich.progress import (BarColumn, MofNCompleteColumn, Progress, 

14 SpinnerColumn, TaskProgressColumn, TextColumn, 

15 TimeElapsedColumn, TimeRemainingColumn) 

16from rich_argparse import RichHelpFormatter 

17 

18QUAD_FORMATS = {"nquads", "trig"} 

19 

20 

21def parse_args() -> argparse.Namespace: # pragma: no cover 

22 parser = argparse.ArgumentParser( 

23 description="Count RDF triples or quads in compressed or uncompressed files.", 

24 formatter_class=RichHelpFormatter, 

25 ) 

26 parser.add_argument( 

27 "directory", 

28 type=Path, 

29 help="Directory containing the RDF files.", 

30 ) 

31 parser.add_argument( 

32 "--pattern", 

33 default="*.nq.gz", 

34 help="Glob pattern for locating files (default: '*.nq.gz').", 

35 ) 

36 parser.add_argument( 

37 "--format", 

38 default="nquads", 

39 choices=["nquads", "json-ld", "turtle", "trig"], 

40 help="RDF format of the input files (default: nquads).", 

41 ) 

42 parser.add_argument( 

43 "--recursive", 

44 action="store_true", 

45 help="Search recursively under the provided directory.", 

46 ) 

47 parser.add_argument( 

48 "--prov-only", 

49 action="store_true", 

50 help="Count only files in 'prov' subdirectories.", 

51 ) 

52 parser.add_argument( 

53 "--data-only", 

54 action="store_true", 

55 help="Count only files not in 'prov' subdirectories.", 

56 ) 

57 parser.add_argument( 

58 "--workers", 

59 type=int, 

60 default=None, 

61 help="Number of parallel workers (default: CPU count).", 

62 ) 

63 parser.add_argument( 

64 "--show-per-file", 

65 action="store_true", 

66 help="Print the count for each processed file.", 

67 ) 

68 parser.add_argument( 

69 "--keep-going", 

70 action="store_true", 

71 help="Continue processing even if errors occur.", 

72 ) 

73 return parser.parse_args() 

74 

75 

76def discover_files( 

77 directory: Path, 

78 pattern: str, 

79 recursive: bool, 

80 prov_only: bool, 

81 data_only: bool, 

82) -> list[Path]: 

83 path = directory.expanduser().resolve() 

84 if not path.is_dir(): 

85 raise ValueError(f"'{path}' does not exist or is not a directory.") 

86 

87 globber = path.rglob if recursive else path.glob 

88 files: list[Path] = [] 

89 

90 for file_path in globber(pattern): 

91 if not file_path.is_file(): 

92 continue 

93 is_prov = "prov" in file_path.parts 

94 if prov_only and not is_prov: 

95 continue 

96 if data_only and is_prov: 

97 continue 

98 files.append(file_path) 

99 

100 return sorted(files) 

101 

102 

103def count_in_file(file_path: Path, rdf_format: str) -> tuple[str, int, str | None]: 

104 try: 

105 suffix = file_path.suffix.lower() 

106 if suffix == ".zip": 

107 with zipfile.ZipFile(file_path, "r") as z: 

108 inner_name = z.namelist()[0] 

109 with z.open(inner_name) as f: 

110 content = f.read().decode("utf-8") 

111 dataset: Dataset = Dataset(default_union=True) 

112 dataset.parse(data=content, format=rdf_format) 

113 elif suffix == ".gz": 

114 dataset = Dataset(default_union=True) 

115 with gzip.open(file_path, "rt", encoding="utf-8") as f: 

116 dataset.parse(f, format=rdf_format) 

117 else: 

118 dataset = Dataset(default_union=True) 

119 with open(file_path, "r", encoding="utf-8") as f: 

120 dataset.parse(f, format=rdf_format) 

121 return str(file_path), len(dataset), None 

122 except Exception as exc: 

123 return str(file_path), 0, str(exc) 

124 

125 

126def process_files( 

127 files: list[Path], 

128 rdf_format: str, 

129 max_workers: int | None, 

130 show_per_file: bool, 

131 keep_going: bool, 

132 unit_name: str, 

133) -> tuple[int, list[tuple[str, str]]]: 

134 workers = max_workers or multiprocessing.cpu_count() 

135 if workers < 1: 

136 workers = 1 

137 

138 total_count = 0 

139 results: list[tuple[str, int]] = [] 

140 failures: list[tuple[str, str]] = [] 

141 

142 with Progress( 

143 SpinnerColumn(), 

144 TextColumn("[progress.description]{task.description}"), 

145 BarColumn(), 

146 TaskProgressColumn(), 

147 MofNCompleteColumn(), 

148 TimeElapsedColumn(), 

149 TimeRemainingColumn(), 

150 ) as progress: 

151 task = progress.add_task(f"Counting {unit_name}", total=len(files)) 

152 

153 with ProcessPoolExecutor(max_workers=workers) as executor: 

154 futures = { 

155 executor.submit(count_in_file, fp, rdf_format): fp for fp in files 

156 } 

157 

158 for future in as_completed(futures): 

159 file_path, count, error = future.result() 

160 

161 if error: 

162 failures.append((file_path, error)) 

163 if not keep_going: 

164 progress.console.print( 

165 f"[red]Error processing {file_path}: {error}[/red]" 

166 ) 

167 progress.advance(task) 

168 break 

169 progress.console.print( 

170 f"[yellow]Error processing {file_path}: {error} (continuing)[/yellow]" 

171 ) 

172 else: 

173 total_count += count 

174 if show_per_file: 

175 results.append((file_path, count)) 

176 

177 progress.advance(task) 

178 

179 if show_per_file and results: 

180 width = max(len(path) for path, _ in results) 

181 for path, count in sorted(results): 

182 print(f"{path.ljust(width)} : {count}") 

183 

184 return total_count, failures 

185 

186 

187def main() -> None: # pragma: no cover 

188 args = parse_args() 

189 

190 if args.prov_only and args.data_only: 

191 print("Error: --prov-only and --data-only are mutually exclusive.") 

192 return 

193 

194 try: 

195 files = discover_files( 

196 args.directory, 

197 args.pattern, 

198 args.recursive, 

199 args.prov_only, 

200 args.data_only, 

201 ) 

202 except ValueError as exc: 

203 print(f"Error: {exc}") 

204 return 

205 

206 if not files: 

207 print("No files found matching the provided pattern.") 

208 return 

209 

210 unit_name = "quads" if args.format in QUAD_FORMATS else "triples" 

211 

212 total, failures = process_files( 

213 files, 

214 args.format, 

215 args.workers, 

216 args.show_per_file, 

217 args.keep_going, 

218 unit_name, 

219 ) 

220 

221 print(f"Total {unit_name}: {total}") 

222 

223 if failures: 

224 print(f"\nFiles with errors ({len(failures)}):") 

225 for path, error in failures: 

226 print(f" {path}: {error}") 

227 

228 

229if __name__ == "__main__": # pragma: no cover 

230 main()