Coverage for oc_validator / helper.py: 99%
135 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 15:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 15:46 +0000
1# ISC License
2#
3# Copyright (c) 2023-2026, Elia Rizzetto, Silvio Peroni
4#
5# Permission to use, copy, modify, and/or distribute this software for any
6# purpose with or without fee is hereby granted, provided that the above
7# copyright notice and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
12# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
13# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
14# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
15# PERFORMANCE OF THIS SOFTWARE.
17from collections import defaultdict
18from csv import field_size_limit, DictReader
19from typing import Generator, Iterator, Optional
20import json
23class UnionFind:
24 """Union-Find (Disjoint Set Union) data structure for grouping related identifiers."""
26 def __init__(self) -> None:
27 """
28 Initialise an empty Union-Find structure.
30 :rtype: None
31 """
32 self.parent = dict()
34 def find(self, x: str) -> str:
35 """
36 Return the root of the component containing *x*.
38 If *x* has never been seen before it is registered as its own root.
39 Path compression is applied on every lookup.
41 :param x: Element identifier.
42 :type x: str
43 :return: Root identifier of the component.
44 :rtype: str
45 """
46 if x not in self.parent:
47 self.parent[x] = x
48 if self.parent[x] != x:
49 self.parent[x] = self.find(self.parent[x]) # Path compression
50 return self.parent[x]
52 def union(self, x: str, y: str) -> None:
53 """
54 Merge the components containing *x* and *y*.
56 The root of *x* is made a child of the root of *y*.
58 :param x: First element.
59 :type x: str
60 :param y: Second element.
61 :type y: str
62 :rtype: None
63 """
64 self.parent[self.find(x)] = self.find(y)
66class Helper:
67 """Container for utility functions used across the validation pipeline."""
69 def __init__(self) -> None:
70 """
71 Initialise the Helper.
73 :rtype: None
74 """
75 self.descr = 'contains helper functions'
77 def group_ids(self, id_groups: list[set]) -> list[set]:
78 """
79 Group identifiers that co-occur in the same row into connected components.
81 Uses a Union-Find algorithm so that two IDs are considered to belong to the
82 same bibliographic entity if they appear together in at least one row.
84 :param id_groups: List of sets, where each set contains the identifiers
85 from a single row field (e.g. ``id``, ``citing_id``).
86 :type id_groups: list[set]
87 :return: List of sets, each grouping the IDs of the same entity.
88 :rtype: list[set]
89 """
91 uf = UnionFind()
93 # Union all IDs that appear together in a group
94 for group in id_groups:
95 ids = list(group)
96 for i in range(1, len(ids)):
97 uf.union(ids[0], ids[i])
99 # Gather groups
100 components = defaultdict(set)
101 for group in id_groups:
102 for id_ in group:
103 root = uf.find(id_)
104 components[root].add(id_)
106 return list(components.values())
108 def create_error_dict(self, validation_level: str, error_type: str, message: str, error_label: str, located_in: str,
109 table: dict, valid: bool = False) -> dict:
110 """
111 Create a dictionary representing a validation error or warning.
113 :param validation_level: One of ``"csv_wellformedness"``, ``"external_syntax"``,
114 ``"existence"``, or ``"semantic"``.
115 :type validation_level: str
116 :param error_type: One of ``"error"`` or ``"warning"``.
117 :type error_type: str
118 :param message: Human-readable error description.
119 :type message: str
120 :param error_label: Machine-readable label uniquely connected to one validation check.
121 :type error_label: str
122 :param located_in: Granularity of the error location — one of ``"row"``,
123 ``"field"``, or ``"item"``.
124 :type located_in: str
125 :param table: Tree structure pinpointing the exact position of all elements
126 involved in the error.
127 :type table: dict
128 :param valid: Whether the data is still considered valid despite the issue.
129 Defaults to ``False``.
130 :type valid: bool
131 :return: Error dictionary consumable by the report writer.
132 :rtype: dict
133 """
135 position = {
136 'located_in': located_in,
137 'table': table
138 }
140 result = {
141 'validation_level': validation_level,
142 'error_type': error_type,
143 'error_label': error_label,
144 'valid': valid,
145 'message': message,
146 'position': position
147 }
149 return result
152 def create_validation_summary_stream(self, json_fp: str) -> Generator[str, None, None]:
153 """
154 Stream a natural-language summary of the validation error report.
156 Performs two passes over the JSON-Lines file: the first counts errors
157 per label and stores the explanation text, the second yields formatted
158 lines grouped by error label.
160 :param json_fp: Path to the JSON-Lines file containing the validation error report.
161 :type json_fp: str
162 :return: Generator yielding lines of the summary.
163 :rtype: Generator[str, None, None]
164 """
166 # ---- FIRST PASS: count errors per label + store explanation ----
167 error_counts = {}
168 label_explanations = {}
170 with JSONLStreamIO(json_fp) as jsonl_stream:
171 for error in jsonl_stream:
172 label = error['error_label']
173 error_counts[label] = error_counts.get(label, 0) + 1
175 # store explanation once
176 if label not in label_explanations:
177 label_explanations[label] = error['message']
179 # ---- SECOND PASS: stream output ----
180 with JSONLStreamIO(json_fp) as jsonl_stream:
182 current_label_seen = {label: 0 for label in error_counts}
184 for error in jsonl_stream:
185 label = error['error_label']
187 # If first time we encounter this label → print header
188 if current_label_seen[label] == 0:
189 count = error_counts[label]
190 explanation = label_explanations[label] + "\n"
192 count_summary = (
193 f"There are {count} {label} issues in the document.\n"
194 if count > 1
195 else f"There is {count} {label} issue in the document.\n"
196 )
198 yield count_summary
199 yield explanation
201 # ---- build location string ----
202 tree = error['position']['table']
203 all_locs = []
205 for row_node_pos, row_node_value in tree.items():
206 for field_node_name, field_node_value in row_node_value.items():
207 single_node_pos = (
208 f"row {row_node_pos}, field {field_node_name}, "
209 f"and items in position {field_node_value}"
210 )
211 all_locs.append(single_node_pos)
213 location = "; ".join(all_locs)
215 # ---- detail line ----
216 current_label_seen[label] += 1
217 idx = current_label_seen[label]
219 if error_counts[label] > 1:
220 detail = f"- {error['error_type']} {idx} involves: {location}.\n"
221 else:
222 detail = f"- The {error['error_type']} involves: {location}.\n"
224 yield detail
226 # spacing between groups
227 if current_label_seen[label] == error_counts[label]:
228 yield "\n\n"
231class CSVStreamReader:
232 """
233 A streamable CSV reader that yields rows one at a time, allowing for memory-efficient
234 processing of large CSV files.
236 Supports multiple passes by reopening the file. The delimiter is auto-detected
237 from the first rows (tries comma, semicolon, and tab).
238 """
239 def __init__(self, csv_fp: str) -> None:
240 """
241 Initialise the reader and auto-detect the CSV delimiter and field names.
243 :param csv_fp: Path to the CSV file to read.
244 :type csv_fp: str
245 :rtype: None
246 """
247 self.csv_fp = csv_fp
248 self._delimiter: Optional[str] = None
249 self._fieldnames: Optional[list[str]] = None
250 self._detect_delimiter_and_fieldnames()
252 def _detect_delimiter_and_fieldnames(self) -> None:
253 """
254 Detect the CSV delimiter and field names from the first rows.
256 Tries ``','``, ``';'``, and ``'\\t'`` in order and selects the first one
257 that produces a row with more than one column.
259 :raises ValueError: if no valid delimiter can be determined.
260 :rtype: None
261 """
262 field_size_limit(100000000) # sets 100 MB as size limit for parsing larger csv fields
263 for delimiter in [',', ';', '\t']:
264 with open(self.csv_fp, newline='', encoding='utf-8') as f:
265 reader = DictReader(f, delimiter=delimiter)
266 # Read first row to check if delimiter is correct
267 try:
268 first_row = next(reader)
269 if first_row and len(first_row) > 1: # if dict has more than 1 key, we assume it's read correctly
270 self._delimiter = delimiter
271 self._fieldnames = reader.fieldnames
272 return
273 except StopIteration:
274 continue # Empty file, try next delimiter
275 raise ValueError("Could not detect CSV delimiter")
277 def stream(self) -> Iterator[dict]:
278 """
279 Stream rows from the CSV file one at a time.
281 Each call reopens the file, so the generator can be consumed multiple
282 times for separate validation passes.
284 :return: Iterator of row dictionaries (as returned by ``csv.DictReader``).
285 :rtype: Iterator[dict]
286 """
287 field_size_limit(100000000)
288 with open(self.csv_fp, newline='', encoding='utf-8') as f:
289 reader = DictReader(f, delimiter=self._delimiter) # if fieldnames is specified, DictReader interprets the first row as data, not header!
290 for row in reader:
291 yield row
293 def __iter__(self) -> Iterator[dict]:
294 """
295 Make the reader directly iterable.
297 :return: Row iterator (delegates to :meth:`stream`).
298 :rtype: Iterator[dict]
299 """
300 return self.stream()
303class JSONLStreamIO:
304 """
305 Context manager for reading and writing JSON-Lines (JSONL) files.
307 Each line in the file is a separate JSON object. Supports both read and
308 write modes and can be used as an iterator for line-by-line consumption.
309 """
310 def __init__(self, jsonl_fp: str, mode: str = 'r') -> None:
311 """
312 Initialise the JSON-Lines handler.
314 :param jsonl_fp: Path to the JSON-Lines file.
315 :type jsonl_fp: str
316 :param mode: File open mode (``'r'``, ``'w'``, or ``'a'``).
317 Defaults to ``'r'``.
318 :type mode: str
319 :rtype: None
320 """
321 self.jsonl_fp = jsonl_fp
322 self.mode = mode
323 self._file = None
325 def __enter__(self):
326 """
327 Open the underlying file and return this instance.
329 :return: The :class:`JSONLStreamIO` instance.
330 :rtype: JSONLStreamIO
331 """
332 self._file = open(self.jsonl_fp, self.mode, encoding='utf-8')
333 return self
335 def __exit__(self, exc_type, exc_val, exc_tb):
336 """
337 Close the underlying file on context exit.
339 :rtype: None
340 """
341 if self._file:
342 self._file.close()
344 def is_empty(self) -> bool:
345 """
346 Check whether the JSON-Lines file is empty or contains only blank lines.
348 :return: ``True`` if the file has no non-empty JSON lines, ``False`` otherwise.
349 :rtype: bool
350 """
351 with open(self.jsonl_fp, 'r', encoding='utf-8') as f:
352 for line in f:
353 if json.loads(line.strip()):
354 return False
355 return True
357 def read(self) -> Iterator[dict]:
358 """
359 Read the JSON-Lines file, yielding one parsed JSON object per line.
361 :return: Iterator of dictionaries parsed from each line.
362 :rtype: Iterator[dict]
363 """
364 with open(self.jsonl_fp, 'r', encoding='utf-8') as f:
365 for line in f:
366 yield json.loads(line.strip())
368 def __iter__(self) -> Iterator[dict]:
369 """
370 Make the handler directly iterable (delegates to :meth:`read`).
372 :return: Iterator of parsed JSON objects.
373 :rtype: Iterator[dict]
374 """
375 return self.read()
377 def write(self, json_obj: dict) -> None:
378 """
379 Write a JSON object as a single line to the file.
381 The file must already be open via the context manager in ``'w'`` or ``'a'`` mode.
383 :param json_obj: JSON-serialisable object to write.
384 :type json_obj: dict
385 :raises ValueError: if the file has not been opened via the context manager.
386 :rtype: None
387 """
388 if self._file is None:
389 raise ValueError("File not open. Use context manager with mode='a' or 'w'.")
390 self._file.write(json.dumps(json_obj) + '\n')
392 def flush(self) -> None:
393 """
394 Flush the underlying file buffer.
396 :rtype: None
397 """
398 if self._file:
399 self._file.flush()
401def read_csv(csv_fp: str) -> list[dict]:
402 """
403 Read an entire CSV file into memory.
405 .. deprecated::
406 Use :class:`CSVStreamReader` for memory-efficient streaming instead.
408 :param csv_fp: Path to the CSV file.
409 :type csv_fp: str
410 :return: List of row dictionaries.
411 :rtype: list[dict]
412 :raises ValueError: if no valid delimiter can be determined.
413 """
414 field_size_limit(100000000) # sets 100 MB as size limit for parsing larger csv fields
415 for delimiter in [',', ';', '\t']:
416 with open(csv_fp, newline='', encoding='utf-8') as f:
417 reader = DictReader(f, delimiter=delimiter)
418 rows = list(reader)
419 if rows and len(rows[0]) > 1: # if each dict has more than 1 key, we assume it's read correctly
420 return rows
421 raise ValueError("Could not detect CSV delimiter")