Coverage for oc_validator / helper.py: 99%

135 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 15:46 +0000

1# ISC License 

2# 

3# Copyright (c) 2023-2026, Elia Rizzetto, Silvio Peroni 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any 

6# purpose with or without fee is hereby granted, provided that the above 

7# copyright notice and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, 

12# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 

13# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR 

14# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 

15# PERFORMANCE OF THIS SOFTWARE. 

16 

17from collections import defaultdict 

18from csv import field_size_limit, DictReader 

19from typing import Generator, Iterator, Optional 

20import json 

21 

22 

23class UnionFind: 

24 """Union-Find (Disjoint Set Union) data structure for grouping related identifiers.""" 

25 

26 def __init__(self) -> None: 

27 """ 

28 Initialise an empty Union-Find structure. 

29 

30 :rtype: None 

31 """ 

32 self.parent = dict() 

33 

34 def find(self, x: str) -> str: 

35 """ 

36 Return the root of the component containing *x*. 

37 

38 If *x* has never been seen before it is registered as its own root. 

39 Path compression is applied on every lookup. 

40 

41 :param x: Element identifier. 

42 :type x: str 

43 :return: Root identifier of the component. 

44 :rtype: str 

45 """ 

46 if x not in self.parent: 

47 self.parent[x] = x 

48 if self.parent[x] != x: 

49 self.parent[x] = self.find(self.parent[x]) # Path compression 

50 return self.parent[x] 

51 

52 def union(self, x: str, y: str) -> None: 

53 """ 

54 Merge the components containing *x* and *y*. 

55 

56 The root of *x* is made a child of the root of *y*. 

57 

58 :param x: First element. 

59 :type x: str 

60 :param y: Second element. 

61 :type y: str 

62 :rtype: None 

63 """ 

64 self.parent[self.find(x)] = self.find(y) 

65 

66class Helper: 

67 """Container for utility functions used across the validation pipeline.""" 

68 

69 def __init__(self) -> None: 

70 """ 

71 Initialise the Helper. 

72 

73 :rtype: None 

74 """ 

75 self.descr = 'contains helper functions' 

76 

77 def group_ids(self, id_groups: list[set]) -> list[set]: 

78 """ 

79 Group identifiers that co-occur in the same row into connected components. 

80 

81 Uses a Union-Find algorithm so that two IDs are considered to belong to the 

82 same bibliographic entity if they appear together in at least one row. 

83 

84 :param id_groups: List of sets, where each set contains the identifiers 

85 from a single row field (e.g. ``id``, ``citing_id``). 

86 :type id_groups: list[set] 

87 :return: List of sets, each grouping the IDs of the same entity. 

88 :rtype: list[set] 

89 """ 

90 

91 uf = UnionFind() 

92 

93 # Union all IDs that appear together in a group 

94 for group in id_groups: 

95 ids = list(group) 

96 for i in range(1, len(ids)): 

97 uf.union(ids[0], ids[i]) 

98 

99 # Gather groups 

100 components = defaultdict(set) 

101 for group in id_groups: 

102 for id_ in group: 

103 root = uf.find(id_) 

104 components[root].add(id_) 

105 

106 return list(components.values()) 

107 

108 def create_error_dict(self, validation_level: str, error_type: str, message: str, error_label: str, located_in: str, 

109 table: dict, valid: bool = False) -> dict: 

110 """ 

111 Create a dictionary representing a validation error or warning. 

112 

113 :param validation_level: One of ``"csv_wellformedness"``, ``"external_syntax"``, 

114 ``"existence"``, or ``"semantic"``. 

115 :type validation_level: str 

116 :param error_type: One of ``"error"`` or ``"warning"``. 

117 :type error_type: str 

118 :param message: Human-readable error description. 

119 :type message: str 

120 :param error_label: Machine-readable label uniquely connected to one validation check. 

121 :type error_label: str 

122 :param located_in: Granularity of the error location — one of ``"row"``, 

123 ``"field"``, or ``"item"``. 

124 :type located_in: str 

125 :param table: Tree structure pinpointing the exact position of all elements 

126 involved in the error. 

127 :type table: dict 

128 :param valid: Whether the data is still considered valid despite the issue. 

129 Defaults to ``False``. 

130 :type valid: bool 

131 :return: Error dictionary consumable by the report writer. 

132 :rtype: dict 

133 """ 

134 

135 position = { 

136 'located_in': located_in, 

137 'table': table 

138 } 

139 

140 result = { 

141 'validation_level': validation_level, 

142 'error_type': error_type, 

143 'error_label': error_label, 

144 'valid': valid, 

145 'message': message, 

146 'position': position 

147 } 

148 

149 return result 

150 

151 

152 def create_validation_summary_stream(self, json_fp: str) -> Generator[str, None, None]: 

153 """ 

154 Stream a natural-language summary of the validation error report. 

155 

156 Performs two passes over the JSON-Lines file: the first counts errors 

157 per label and stores the explanation text, the second yields formatted 

158 lines grouped by error label. 

159 

160 :param json_fp: Path to the JSON-Lines file containing the validation error report. 

161 :type json_fp: str 

162 :return: Generator yielding lines of the summary. 

163 :rtype: Generator[str, None, None] 

164 """ 

165 

166 # ---- FIRST PASS: count errors per label + store explanation ---- 

167 error_counts = {} 

168 label_explanations = {} 

169 

170 with JSONLStreamIO(json_fp) as jsonl_stream: 

171 for error in jsonl_stream: 

172 label = error['error_label'] 

173 error_counts[label] = error_counts.get(label, 0) + 1 

174 

175 # store explanation once 

176 if label not in label_explanations: 

177 label_explanations[label] = error['message'] 

178 

179 # ---- SECOND PASS: stream output ---- 

180 with JSONLStreamIO(json_fp) as jsonl_stream: 

181 

182 current_label_seen = {label: 0 for label in error_counts} 

183 

184 for error in jsonl_stream: 

185 label = error['error_label'] 

186 

187 # If first time we encounter this label → print header 

188 if current_label_seen[label] == 0: 

189 count = error_counts[label] 

190 explanation = label_explanations[label] + "\n" 

191 

192 count_summary = ( 

193 f"There are {count} {label} issues in the document.\n" 

194 if count > 1 

195 else f"There is {count} {label} issue in the document.\n" 

196 ) 

197 

198 yield count_summary 

199 yield explanation 

200 

201 # ---- build location string ---- 

202 tree = error['position']['table'] 

203 all_locs = [] 

204 

205 for row_node_pos, row_node_value in tree.items(): 

206 for field_node_name, field_node_value in row_node_value.items(): 

207 single_node_pos = ( 

208 f"row {row_node_pos}, field {field_node_name}, " 

209 f"and items in position {field_node_value}" 

210 ) 

211 all_locs.append(single_node_pos) 

212 

213 location = "; ".join(all_locs) 

214 

215 # ---- detail line ---- 

216 current_label_seen[label] += 1 

217 idx = current_label_seen[label] 

218 

219 if error_counts[label] > 1: 

220 detail = f"- {error['error_type']} {idx} involves: {location}.\n" 

221 else: 

222 detail = f"- The {error['error_type']} involves: {location}.\n" 

223 

224 yield detail 

225 

226 # spacing between groups 

227 if current_label_seen[label] == error_counts[label]: 

228 yield "\n\n" 

229 

230 

231class CSVStreamReader: 

232 """ 

233 A streamable CSV reader that yields rows one at a time, allowing for memory-efficient 

234 processing of large CSV files. 

235 

236 Supports multiple passes by reopening the file. The delimiter is auto-detected 

237 from the first rows (tries comma, semicolon, and tab). 

238 """ 

239 def __init__(self, csv_fp: str) -> None: 

240 """ 

241 Initialise the reader and auto-detect the CSV delimiter and field names. 

242 

243 :param csv_fp: Path to the CSV file to read. 

244 :type csv_fp: str 

245 :rtype: None 

246 """ 

247 self.csv_fp = csv_fp 

248 self._delimiter: Optional[str] = None 

249 self._fieldnames: Optional[list[str]] = None 

250 self._detect_delimiter_and_fieldnames() 

251 

252 def _detect_delimiter_and_fieldnames(self) -> None: 

253 """ 

254 Detect the CSV delimiter and field names from the first rows. 

255 

256 Tries ``','``, ``';'``, and ``'\\t'`` in order and selects the first one 

257 that produces a row with more than one column. 

258 

259 :raises ValueError: if no valid delimiter can be determined. 

260 :rtype: None 

261 """ 

262 field_size_limit(100000000) # sets 100 MB as size limit for parsing larger csv fields 

263 for delimiter in [',', ';', '\t']: 

264 with open(self.csv_fp, newline='', encoding='utf-8') as f: 

265 reader = DictReader(f, delimiter=delimiter) 

266 # Read first row to check if delimiter is correct 

267 try: 

268 first_row = next(reader) 

269 if first_row and len(first_row) > 1: # if dict has more than 1 key, we assume it's read correctly 

270 self._delimiter = delimiter 

271 self._fieldnames = reader.fieldnames 

272 return 

273 except StopIteration: 

274 continue # Empty file, try next delimiter 

275 raise ValueError("Could not detect CSV delimiter") 

276 

277 def stream(self) -> Iterator[dict]: 

278 """ 

279 Stream rows from the CSV file one at a time. 

280 

281 Each call reopens the file, so the generator can be consumed multiple 

282 times for separate validation passes. 

283 

284 :return: Iterator of row dictionaries (as returned by ``csv.DictReader``). 

285 :rtype: Iterator[dict] 

286 """ 

287 field_size_limit(100000000) 

288 with open(self.csv_fp, newline='', encoding='utf-8') as f: 

289 reader = DictReader(f, delimiter=self._delimiter) # if fieldnames is specified, DictReader interprets the first row as data, not header! 

290 for row in reader: 

291 yield row 

292 

293 def __iter__(self) -> Iterator[dict]: 

294 """ 

295 Make the reader directly iterable. 

296 

297 :return: Row iterator (delegates to :meth:`stream`). 

298 :rtype: Iterator[dict] 

299 """ 

300 return self.stream() 

301 

302 

303class JSONLStreamIO: 

304 """ 

305 Context manager for reading and writing JSON-Lines (JSONL) files. 

306 

307 Each line in the file is a separate JSON object. Supports both read and 

308 write modes and can be used as an iterator for line-by-line consumption. 

309 """ 

310 def __init__(self, jsonl_fp: str, mode: str = 'r') -> None: 

311 """ 

312 Initialise the JSON-Lines handler. 

313 

314 :param jsonl_fp: Path to the JSON-Lines file. 

315 :type jsonl_fp: str 

316 :param mode: File open mode (``'r'``, ``'w'``, or ``'a'``). 

317 Defaults to ``'r'``. 

318 :type mode: str 

319 :rtype: None 

320 """ 

321 self.jsonl_fp = jsonl_fp 

322 self.mode = mode 

323 self._file = None 

324 

325 def __enter__(self): 

326 """ 

327 Open the underlying file and return this instance. 

328 

329 :return: The :class:`JSONLStreamIO` instance. 

330 :rtype: JSONLStreamIO 

331 """ 

332 self._file = open(self.jsonl_fp, self.mode, encoding='utf-8') 

333 return self 

334 

335 def __exit__(self, exc_type, exc_val, exc_tb): 

336 """ 

337 Close the underlying file on context exit. 

338 

339 :rtype: None 

340 """ 

341 if self._file: 

342 self._file.close() 

343 

344 def is_empty(self) -> bool: 

345 """ 

346 Check whether the JSON-Lines file is empty or contains only blank lines. 

347 

348 :return: ``True`` if the file has no non-empty JSON lines, ``False`` otherwise. 

349 :rtype: bool 

350 """ 

351 with open(self.jsonl_fp, 'r', encoding='utf-8') as f: 

352 for line in f: 

353 if json.loads(line.strip()): 

354 return False 

355 return True 

356 

357 def read(self) -> Iterator[dict]: 

358 """ 

359 Read the JSON-Lines file, yielding one parsed JSON object per line. 

360 

361 :return: Iterator of dictionaries parsed from each line. 

362 :rtype: Iterator[dict] 

363 """ 

364 with open(self.jsonl_fp, 'r', encoding='utf-8') as f: 

365 for line in f: 

366 yield json.loads(line.strip()) 

367 

368 def __iter__(self) -> Iterator[dict]: 

369 """ 

370 Make the handler directly iterable (delegates to :meth:`read`). 

371 

372 :return: Iterator of parsed JSON objects. 

373 :rtype: Iterator[dict] 

374 """ 

375 return self.read() 

376 

377 def write(self, json_obj: dict) -> None: 

378 """ 

379 Write a JSON object as a single line to the file. 

380 

381 The file must already be open via the context manager in ``'w'`` or ``'a'`` mode. 

382 

383 :param json_obj: JSON-serialisable object to write. 

384 :type json_obj: dict 

385 :raises ValueError: if the file has not been opened via the context manager. 

386 :rtype: None 

387 """ 

388 if self._file is None: 

389 raise ValueError("File not open. Use context manager with mode='a' or 'w'.") 

390 self._file.write(json.dumps(json_obj) + '\n') 

391 

392 def flush(self) -> None: 

393 """ 

394 Flush the underlying file buffer. 

395 

396 :rtype: None 

397 """ 

398 if self._file: 

399 self._file.flush() 

400 

401def read_csv(csv_fp: str) -> list[dict]: 

402 """ 

403 Read an entire CSV file into memory. 

404 

405 .. deprecated:: 

406 Use :class:`CSVStreamReader` for memory-efficient streaming instead. 

407 

408 :param csv_fp: Path to the CSV file. 

409 :type csv_fp: str 

410 :return: List of row dictionaries. 

411 :rtype: list[dict] 

412 :raises ValueError: if no valid delimiter can be determined. 

413 """ 

414 field_size_limit(100000000) # sets 100 MB as size limit for parsing larger csv fields 

415 for delimiter in [',', ';', '\t']: 

416 with open(csv_fp, newline='', encoding='utf-8') as f: 

417 reader = DictReader(f, delimiter=delimiter) 

418 rows = list(reader) 

419 if rows and len(rows[0]) > 1: # if each dict has more than 1 key, we assume it's read correctly 

420 return rows 

421 raise ValueError("Could not detect CSV delimiter")