Coverage for oc_validator / main.py: 75%
660 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 15:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 15:46 +0000
1# ISC License
2#
3# Copyright (c) 2023-2026, Elia Rizzetto, Silvio Peroni
4#
5# Permission to use, copy, modify, and/or distribute this software for any
6# purpose with or without fee is hereby granted, provided that the above
7# copyright notice and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
12# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
13# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
14# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
15# PERFORMANCE OF THIS SOFTWARE.
17from csv import DictReader, field_size_limit
18from yaml import full_load
19from json import load, dumps
20from os.path import exists, join, dirname, abspath
21from os import makedirs, getcwd
22from re import finditer
23import tempfile
24import shutil
25import lmdb
26from typing import Optional
27from oc_validator.helper import Helper, read_csv, CSVStreamReader, JSONLStreamIO
28from oc_validator.csv_wellformedness import Wellformedness
29from oc_validator.id_syntax import IdSyntax
30from oc_validator.id_existence import IdExistence
31from oc_validator.semantics import Semantics
32from oc_validator.table_reader import read_metadata_row, read_citations_row
33from oc_validator.lmdb_cache import LmdbCache, InMemoryCache, LmdbUnionFind, InMemoryUnionFind, UnionFind
34from tqdm import tqdm
35from argparse import ArgumentParser
36from oc_validator import configure_logging, logger
37from time import time
40# --- Custom Exception classes. ---
41class ValidationError(Exception):
42 """Base class for errors related to the validation process."""
43 pass
45class InvalidTableError(ValidationError):
46 """Raised when the submitted table cannot be identified as META-CSV or CITS-CSV, therefore cannot be processed."""
47 def __init__(self, input_fp):
48 super().__init__('The submitted table does not meet the required basic formatting standards. '
49 'Please ensure that both the metadata and citations tables are valid CSV files following the correct structure: '
50 'the metadata table must have the following columns: "id", "title", "author", "pub_date", "venue", "volume", "issue", "page", "type", "publisher", "editor"; '
51 'the citations table must have either 4 columns ("citing_id", "citing_publication_date", "cited_id", "cited_publication_date") or two columns ("citing_id","cited_id")'
52 'Refer to the documentation at https://github.com/opencitations/crowdsourcing/blob/main/README.md for the expected format and structure before resubmitting your deposit.')
53 self.input_fp = input_fp
55class TableNotMatchingInstance(ValidationError):
56 """Raised when the table submitted for a specific Validator instance in ClosureValidator does not match the process validation type,
57 e.g. a CITS-CSV table is submitted for an instance of Validator that is intended to process a META-CSV table.
58 """
59 def __init__(self, input_fp, detected_table_type, correct_table_type):
60 super().__init__(f'The submitted table in file "{input_fp}" is of type {detected_table_type}, but should be of type {correct_table_type} instead.')
61 self.input_fp = input_fp
62 self.detected_table_type = detected_table_type
63 self.correct_table_type = correct_table_type
65# --- Class for the main process; validates one document at a time via the Validator.validate() method. ---
66class Validator:
67 def __init__(self, csv_doc: str, output_dir: str, use_meta_endpoint=False, verify_id_existence=True,
68 use_lmdb=False, map_size: int = 1 * 1024**3, cache_dir: str = None, verbose: bool = False,
69 log_file: str = None):
70 """
71 Initialize the Validator.
73 :param csv_doc: Path to the CSV file to validate
74 :param output_dir: Directory to store validation output
75 :param use_meta_endpoint: Whether to use OC Meta endpoint for ID existence checks
76 :param verify_id_existence: Whether to verify ID existence
77 :param use_lmdb: If True, use LMDB for caching (recommended for large files)
78 :param map_size: Maximum size in bytes for each LMDB environment (default 1 GB)
79 :param cache_dir: Optional base directory under which all LMDB caches are created
80 :param verbose: If True, enable DEBUG-level logging output
81 :param log_file: If provided, write logs to this file instead of the terminal
82 """
83 self.csv_doc = csv_doc
84 self.verbose = verbose
85 configure_logging(verbose, log_file)
86 logger.debug("Initializing Validator for '%s' (output: '%s')", csv_doc, output_dir)
87 self.csv_stream = CSVStreamReader(csv_doc) # Use streaming instead of loading all data
88 self.table_to_process = self.process_selector()
89 self.helper = Helper()
90 self.wellformed = Wellformedness()
91 self.syntax = IdSyntax()
92 self.existence = IdExistence(use_meta_endpoint=use_meta_endpoint)
93 self.semantics = Semantics()
94 script_dir = dirname(abspath(__file__)) # Directory where the script is located
95 with open(join(script_dir, 'messages.yaml'), 'r', encoding='utf-8') as fm:
96 self.messages = full_load(fm)
97 with open(join(script_dir, 'id_type_alignment.json'), 'r', encoding='utf-8') as fa:
98 self.id_type_dict = load(fa)
99 self.output_dir = output_dir
100 if not exists(self.output_dir):
101 makedirs(self.output_dir)
102 if self.table_to_process == 'meta_csv':
103 self.output_fp_json = self._make_output_filepath('out_validate_meta', 'jsonl')
104 self.output_fp_txt = self._make_output_filepath('meta_validation_summary', 'txt')
105 elif self.table_to_process == 'cits_csv':
106 self.output_fp_json = self._make_output_filepath('out_validate_cits', 'jsonl')
107 self.output_fp_txt = self._make_output_filepath('cits_validation_summary', 'txt')
109 logger.debug("Detected table type: %s", self.table_to_process)
110 logger.debug("Output files: jsonl='%s', txt='%s'", self.output_fp_json, self.output_fp_txt)
112 # Initialize cache based on memory_efficient flag
113 self.memory_efficient = use_lmdb
114 self.map_size = map_size
115 self._cache_dir = cache_dir
117 cache_name = f'validator_{hash(csv_doc)}'
118 if use_lmdb:
119 self.id_cache = LmdbCache(cache_name, base_dir=self._cache_dir or '.', map_size=self.map_size)
120 else:
121 self.id_cache = InMemoryCache(cache_name)
123 logger.info("Cache type: %s", 'LMDB' if use_lmdb else 'in-memory')
125 # Open the cache
126 self.id_cache.open()
128 self.verify_id_existence = verify_id_existence
129 self._uf = None
130 self._uf_env = None
131 self._uf_tmp_dir = None
132 self.duplicate_data_cache = None
134 def __enter__(self):
135 """Context manager entry."""
136 return self
138 def __exit__(self, exc_type, exc_val, exc_tb):
139 """Context manager exit - ensures cache is properly closed."""
140 self.close()
141 return False
143 def close(self):
144 """Close the cache and clean up resources."""
145 if hasattr(self, 'id_cache') and self.id_cache is not None:
146 self.id_cache.close()
147 if hasattr(self, '_uf_env') and self._uf_env is not None:
148 self._uf_env.close()
149 self._uf_env = None
150 if hasattr(self, '_uf_tmp_dir') and self._uf_tmp_dir is not None:
151 shutil.rmtree(self._uf_tmp_dir, ignore_errors=True)
152 self._uf_tmp_dir = None
153 if hasattr(self, 'duplicate_data_cache') and self.duplicate_data_cache is not None:
154 self.duplicate_data_cache.close()
155 self.duplicate_data_cache = None
157 def process_selector(self) -> str:
158 """
159 Detect the table type by streaming the first few rows.
161 Reads up to 10 rows to determine whether the CSV is a META-CSV or a
162 CITS-CSV, then returns the corresponding identifier string.
164 :return: ``'meta_csv'`` or ``'cits_csv'``.
165 :rtype: str
166 :raises InvalidTableError: if the table structure cannot be recognised.
167 """
168 # Read first few rows to determine table type
169 sample_rows = []
170 for i, row in enumerate(self.csv_stream):
171 if i >= 10: # Only need first 10 rows to determine type
172 break
173 sample_rows.append(row)
175 if not sample_rows:
176 raise InvalidTableError(self.csv_doc)
178 process_type = None
179 try:
180 if all(set(row.keys()) == {"id", "title", "author", "pub_date", "venue", "volume", "issue", "page", "type",
181 "publisher", "editor"} for row in sample_rows):
182 process_type = 'meta_csv'
183 return process_type
184 elif all(set(row.keys()) == {'citing_id', 'citing_publication_date', 'cited_id', 'cited_publication_date'} for row in sample_rows):
185 process_type = 'cits_csv'
186 return process_type
187 elif all(set(row.keys()) == {'citing_id', 'cited_id'} for row in sample_rows): # support also Index tables with no publication dates
188 process_type = 'cits_csv'
189 return process_type
190 else:
191 raise InvalidTableError(self.csv_doc)
192 except KeyError:
193 raise InvalidTableError(self.csv_doc)
195 def _make_output_filepath(self, base_filename: str, extension: str) -> str:
196 """
197 Generate a unique output filepath.
199 If a file with the base name already exists in the output directory,
200 an incrementing counter is appended to the filename.
202 :param base_filename: Base name for the output file (without extension).
203 :type base_filename: str
204 :param extension: File extension (e.g. ``'jsonl'``, ``'txt'``).
205 :type extension: str
206 :return: Absolute path to a non-existing output file.
207 :rtype: str
208 """
210 full_path = join(self.output_dir, f"{base_filename}.{extension}")
211 counter = 1
213 # If filepath already exists, increment the counter and check for existing files
214 while exists(full_path):
215 full_path = join(self.output_dir, f"{base_filename}_{counter}.{extension}")
216 counter += 1
218 return full_path
220 def validate(self) -> bool:
221 """
222 Run the full validation pipeline on the input CSV document.
224 Dispatches to :meth:`validate_meta` or :meth:`validate_cits` depending
225 on the detected table type.
227 :return: ``True`` if the table is valid (no issues found), ``False`` otherwise.
228 :rtype: bool
229 """
230 logger.info("Starting validation of '%s'", self.csv_doc)
231 try:
232 start = time()
233 if self.table_to_process == 'meta_csv':
234 result = self.validate_meta()
235 elif self.table_to_process == 'cits_csv':
236 result = self.validate_cits()
237 logger.info("Validation of '%s' complete. Valid: %s", self.csv_doc, result)
238 return result
239 finally:
240 logger.info(f"Cleaning up resources for {self.table_to_process} table...")
241 self.existence.close()
242 if self.id_cache._is_open:
243 self.id_cache.close()
244 logger.info(f"Process finished in {(time() - start)/60:.2f} minutes.")
247 def validate_meta(self) -> bool:
248 """
249 Validate an instance of META-CSV using JSON-Lines streaming output
250 :return: True if the table is valid (i.e. no issues found), False otherwise.
251 """
252 logger.info("Validating META-CSV: '%s'", self.csv_doc)
253 messages = self.messages
254 id_type_dict = self.id_type_dict
256 # Set up Union-Find and cache for duplicate detection
257 # NOTE: if self.memory_efficient is True, these open LMDB envs which must be
258 # closed (deleting related dir) via self.close()
259 if self.memory_efficient:
260 tmp_base = self._cache_dir or '.'
261 uf_tmp_dir = tempfile.mkdtemp(prefix='uf_dup_meta_', dir=tmp_base)
262 uf_env = lmdb.open(uf_tmp_dir, map_size=self.map_size, sync=False, metasync=False)
263 uf = LmdbUnionFind(uf_env)
264 else:
265 uf = InMemoryUnionFind()
266 uf_tmp_dir = None
267 uf_env = None
269 self._uf = uf
270 self._uf_tmp_dir = uf_tmp_dir
271 self._uf_env = uf_env
273 dup_cache_name = f'dup_meta_{abs(hash(self.csv_doc))}'
274 if self.memory_efficient:
275 duplicate_data_cache = LmdbCache(dup_cache_name, base_dir=self._cache_dir or '.', map_size=self.map_size)
276 else:
277 duplicate_data_cache = InMemoryCache(dup_cache_name)
278 duplicate_data_cache.open()
279 self.duplicate_data_cache = duplicate_data_cache
281 # Open JSON-L file for streaming output
282 with JSONLStreamIO(self.output_fp_json, 'a') as jsonl_file:
283 for row_idx, row in enumerate(tqdm(self.csv_stream.stream(), desc="Validating")):
284 # Recreate SPARQL client periodically to prevent memory growth
285 if row_idx > 0 and row_idx % 10000 == 0 and self.existence.use_meta_endpoint:
286 self.existence._recreate_sparql_client()
288 row_ok = True # switch for row well-formedness
289 id_ok = True # switch for id field well-formedness
290 type_ok = True # switch for type field well-formedness
292 # Collect ID data for duplicate detection
293 id_value = row.get('id', '')
294 duplicate_data_cache[str(row_idx)] = id_value
295 if id_value:
296 items = id_value.split(' ')
297 non_empty = [i for i in items if i]
298 if non_empty:
299 uf.find(non_empty[0])
300 for _i in range(1, len(non_empty)):
301 uf.union(non_empty[0], non_empty[_i])
303 missing_required_fields = self.wellformed.get_missing_values(
304 row) # dict w/ positions of error in row; empty if row is fine
305 if missing_required_fields:
306 message = messages['m17']
307 table = {row_idx: missing_required_fields}
308 error = self.helper.create_error_dict(
309 validation_level='csv_wellformedness',
310 error_type='error',
311 message=message,
312 error_label='required_fields',
313 located_in='field',
314 table=table)
315 jsonl_file.write(error)
316 row_ok = False
318 # Parse row into structured object
319 row_obj = read_metadata_row(row)
321 for field, value in row.items():
323 if field == 'id':
324 # Use structured object's parsed id field
325 items = row_obj.id
326 if items:
327 br_ids_set = set() # set where to put well-formed br IDs only
329 for item_idx, item in enumerate(items):
331 if item == '':
332 message = messages['m1']
333 table = {row_idx: {field: [item_idx]}}
334 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
335 error_type='error',
336 message=message,
337 error_label='extra_space',
338 located_in='item',
339 table=table)
340 jsonl_file.write(error)
342 elif not self.wellformed.wellformedness_br_id(item):
343 message = messages['m2']
344 table = {row_idx: {field: [item_idx]}}
345 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
346 error_type='error',
347 message=message,
348 error_label='br_id_format',
349 located_in='item',
350 table=table)
351 jsonl_file.write(error)
353 else:
354 if item not in br_ids_set:
355 br_ids_set.add(item)
356 else: # in-field duplication of the same ID
357 table = {row_idx: {field: [i for i, v in enumerate(items) if v == item]}}
358 message = messages['m6']
360 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
361 error_type='error',
362 message=message,
363 error_label='duplicate_id',
364 located_in='item',
365 table=table) # valid=False
366 jsonl_file.write(error)
368 # 2nd validation level: EXTERNAL SYNTAX OF ID (BIBLIOGRAPHIC RESOURCE)
369 if not self.syntax.check_id_syntax(item):
370 message = messages['m19']
371 table = {row_idx: {field: [item_idx]}}
372 error = self.helper.create_error_dict(validation_level='external_syntax',
373 error_type='error',
374 message=message,
375 error_label='br_id_syntax',
376 located_in='item',
377 table=table)
378 jsonl_file.write(error)
379 # 3rd validation level: EXISTENCE OF ID (BIBLIOGRAPHIC RESOURCE)
380 else:
381 if self.verify_id_existence: # if verify_id_existence is False just skip these operations
382 message = messages['m20']
383 table = {row_idx: {field: [item_idx]}}
384 if item not in self.id_cache:
385 if not self.existence.check_id_existence(item):
386 error = self.helper.create_error_dict(validation_level='existence',
387 error_type='warning',
388 message=message,
389 error_label='br_id_existence',
390 located_in='item',
391 table=table, valid=True)
392 jsonl_file.write(error)
393 self.id_cache[item] = False
394 else:
395 self.id_cache[item] = True
396 elif self.id_cache[item] is False:
397 error = self.helper.create_error_dict(validation_level='existence',
398 error_type='warning',
399 message=message,
400 error_label='br_id_existence',
401 located_in='item',
402 table=table, valid=True)
403 jsonl_file.write(error)
405 if len(br_ids_set) != len(items): # --> some well-formedness error occurred in the id field
406 id_ok = False
408 elif field == 'title':
409 if value:
410 if value.isupper():
411 message = messages['m8']
412 table = {row_idx: {field: [0]}}
413 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
414 error_type='warning',
415 message=message,
416 error_label='uppercase_title',
417 located_in='item',
418 table=table,
419 valid=True)
420 jsonl_file.write(error)
422 elif field == 'author' or field == 'editor':
423 # Use structured object's parsed field
424 if field == 'author':
425 agents = row_obj.author
426 else: # field == 'editor'
427 agents = row_obj.editor
429 if agents:
430 items = agents # Already parsed list of AgentItem objects
432 # Check in-field duplication based on shared RA IDs
433 dup_groups = self.wellformed.check_duplicate_ra_by_id(items)
434 for dup_indices in dup_groups:
435 table = {row_idx: {field: dup_indices}}
436 message = messages['m26']
438 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
439 error_type='error',
440 message=message,
441 error_label='duplicate_ra',
442 located_in='item',
443 table=table)
444 jsonl_file.write(error)
446 for item_idx, item in enumerate(items):
447 # Check orphan RA ID using the raw string
448 if self.wellformed.orphan_ra_id(item._raw):
449 message = messages['m10']
450 table = {row_idx: {field: [item_idx]}}
451 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
452 error_type='warning',
453 message=message,
454 error_label='orphan_ra_id',
455 located_in='item',
456 table=table,
457 valid=True)
458 jsonl_file.write(error)
460 # Validate using the raw string
461 if not self.wellformed.wellformedness_people_item(item._raw):
462 message = messages['m9']
463 table = {row_idx: {field: [item_idx]}}
464 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
465 error_type='error',
466 message=message,
467 error_label='people_item_format',
468 located_in='item',
469 table=table)
470 jsonl_file.write(error)
472 else:
474 # Use structured object's ids attribute
475 ids = item.ids
477 for id in ids:
478 # 2nd validation level: EXTERNAL SYNTAX OF ID (RESPONSIBLE AGENT)
479 if not self.syntax.check_id_syntax(id):
480 message = messages['m21']
481 table = {row_idx: {field: [item_idx]}}
482 error = self.helper.create_error_dict(validation_level='external_syntax',
483 error_type='error',
484 message=message,
485 error_label='ra_id_syntax',
486 located_in='item',
487 table=table)
488 jsonl_file.write(error)
489 # 3rd validation level: EXISTENCE OF ID (RESPONSIBLE AGENT)
490 else:
491 if self.verify_id_existence: # if verify_id_existence is False just skip these operations
492 message = messages['m22']
493 table = {row_idx: {field: [item_idx]}}
494 if id not in self.id_cache:
495 if not self.existence.check_id_existence(id):
496 error = self.helper.create_error_dict(validation_level='existence',
497 error_type='warning',
498 message=message,
499 error_label='ra_id_existence',
500 located_in='item',
501 table=table,
502 valid=True)
503 jsonl_file.write(error)
504 self.id_cache[id] = False
505 else:
506 self.id_cache[id] = True
507 elif self.id_cache[id] is False:
508 error = self.helper.create_error_dict(validation_level='existence',
509 error_type='warning',
510 message=message,
511 error_label='ra_id_existence',
512 located_in='item',
513 table=table,
514 valid=True)
515 jsonl_file.write(error)
516 elif field == 'pub_date':
517 if value:
518 if not self.wellformed.wellformedness_date(value):
519 message = messages['m3']
520 table = {row_idx: {field: [0]}}
521 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
522 error_type='error',
523 message=message,
524 error_label='date_format',
525 located_in='item',
526 table=table)
527 jsonl_file.write(error)
529 elif field == 'venue':
530 # Use structured object's parsed field
531 venue = row_obj.venue
532 if venue:
534 # Check orphan venue ID using the raw string
535 if self.wellformed.orphan_venue_id(venue._raw):
536 message = messages['m15']
537 table = {row_idx: {field: [0]}}
538 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
539 error_type='warning',
540 message=message,
541 error_label='orphan_venue_id',
542 located_in='item',
543 table=table,
544 valid=True)
545 jsonl_file.write(error)
547 # Validate using the raw string
548 if not self.wellformed.wellformedness_venue(venue._raw):
549 message = messages['m12']
550 table = {row_idx: {field: [0]}}
551 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
552 error_type='error',
553 message=message,
554 error_label='venue_format',
555 located_in='item',
556 table=table)
557 jsonl_file.write(error)
559 else:
560 # Use structured object's ids attribute
561 ids = venue.ids
563 for id in ids:
565 # 2nd validation level: EXTERNAL SYNTAX OF ID (BIBLIOGRAPHIC RESOURCE)
566 if not self.syntax.check_id_syntax(id):
567 message = messages['m19']
568 table = {row_idx: {field: [0]}}
569 error = self.helper.create_error_dict(validation_level='external_syntax',
570 error_type='error',
571 message=message,
572 error_label='br_id_syntax',
573 located_in='item',
574 table=table)
575 jsonl_file.write(error)
576 # 3rd validation level: EXISTENCE OF ID (BIBLIOGRAPHIC RESOURCE)
577 else:
578 if self.verify_id_existence: # if verify_id_existence is False just skip these operations
579 message = messages['m20']
580 table = {row_idx: {field: [0]}}
581 if id not in self.id_cache:
582 if not self.existence.check_id_existence(id):
583 error = self.helper.create_error_dict(validation_level='existence',
584 error_type='warning',
585 message=message,
586 error_label='br_id_existence',
587 located_in='item',
588 table=table,
589 valid=True)
590 jsonl_file.write(error)
591 self.id_cache[id] = False
592 else:
593 self.id_cache[id] = True
594 elif self.id_cache[id] is False:
595 error = self.helper.create_error_dict(validation_level='existence',
596 error_type='warning',
597 message=message,
598 error_label='br_id_existence',
599 located_in='item',
600 table=table,
601 valid=True)
602 jsonl_file.write(error)
604 elif field == 'volume':
605 if value:
606 if not self.wellformed.wellformedness_volume_issue(value):
607 message = messages['m13']
608 table = {row_idx: {field: [0]}}
609 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
610 error_type='error',
611 message=message,
612 error_label='volume_issue_format',
613 located_in='item',
614 table=table)
615 jsonl_file.write(error)
617 elif field == 'issue':
618 if value:
619 if not self.wellformed.wellformedness_volume_issue(value):
620 message = messages['m13']
621 table = {row_idx: {field: [0]}}
622 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
623 error_type='error',
624 message=message,
625 error_label='volume_issue_format',
626 located_in='item',
627 table=table)
628 jsonl_file.write(error)
630 elif field == 'page':
631 if value:
632 if not self.wellformed.wellformedness_page(value):
633 message = messages['m14']
634 table = {row_idx: {field: [0]}}
635 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
636 error_type='error',
637 message=message,
638 error_label='page_format',
639 located_in='item',
640 table=table)
641 jsonl_file.write(error)
642 else:
643 if not self.wellformed.check_page_interval(value):
644 message = messages['m18']
645 table = {row_idx: {field: [0]}}
646 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
647 error_type='warning',
648 message=message,
649 error_label='page_interval',
650 located_in='item',
651 table=table,
652 valid=True)
653 jsonl_file.write(error)
655 elif field == 'type':
656 if value:
657 if not self.wellformed.wellformedness_type(value):
658 message = messages['m16']
659 table = {row_idx: {field: [0]}}
660 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
661 error_type='error',
662 message=message,
663 error_label='type_format',
664 located_in='item',
665 table=table)
666 jsonl_file.write(error)
668 type_ok = False
670 elif field == 'publisher':
671 # Use structured object's parsed field
672 publishers = row_obj.publisher
673 if publishers:
674 items = publishers # Already parsed list of AgentItem objects
676 # Check in-field duplication based on raw string exact match
677 dup_groups = self.wellformed.check_duplicate_publisher_by_raw(items)
678 for dup_indices in dup_groups:
679 table = {row_idx: {field: dup_indices}}
680 message = messages['m26']
682 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
683 error_type='error',
684 message=message,
685 error_label='duplicate_ra',
686 located_in='item',
687 table=table)
688 jsonl_file.write(error)
690 for item_idx, item in enumerate(items):
691 # Check orphan RA ID using the raw string
692 if self.wellformed.orphan_ra_id(item._raw):
693 message = messages['m10']
694 table = {row_idx: {field: [item_idx]}}
695 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
696 error_type='warning',
697 message=message,
698 error_label='orphan_ra_id',
699 located_in='item',
700 table=table,
701 valid=True)
702 jsonl_file.write(error)
704 # Validate using the raw string
705 if not self.wellformed.wellformedness_publisher_item(item._raw):
706 message = messages['m9']
707 table = {row_idx: {field: [item_idx]}}
708 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
709 error_type='error',
710 message=message,
711 error_label='publisher_format',
712 located_in='item',
713 table=table)
714 jsonl_file.write(error)
716 else:
718 # Use structured object's ids attribute
719 ids = item.ids
721 for id in ids:
723 # 2nd validation level: EXTERNAL SYNTAX OF ID (RESPONSIBLE AGENT)
724 if not self.syntax.check_id_syntax(id):
725 message = messages['m21']
726 table = {row_idx: {field: [item_idx]}}
727 error = self.helper.create_error_dict(validation_level='external_syntax',
728 error_type='error',
729 message=message,
730 error_label='ra_id_syntax',
731 located_in='item',
732 table=table)
733 jsonl_file.write(error)
734 # 3rd validation level: EXISTENCE OF ID (RESPONSIBLE AGENT)
735 else:
736 if self.verify_id_existence: # if verify_id_existence is False just skip these operations
737 message = messages['m22']
738 table = {row_idx: {field: [item_idx]}}
739 if id not in self.id_cache:
740 if not self.existence.check_id_existence(id):
741 error = self.helper.create_error_dict(validation_level='existence',
742 error_type='warning',
743 message=message,
744 error_label='ra_id_existence',
745 located_in='item',
746 table=table,
747 valid=True)
748 jsonl_file.write(error)
749 self.id_cache[id] = False
750 else:
751 self.id_cache[id] = True
752 elif self.id_cache[id] is False:
753 error = self.helper.create_error_dict(validation_level='existence',
754 error_type='warning',
755 message=message,
756 error_label='ra_id_existence',
757 located_in='item',
758 table=table,
759 valid=True)
760 jsonl_file.write(error)
762 if row_ok and id_ok and type_ok: # row semantics is checked only when the involved parts are well-formed
764 invalid_semantics = self.semantics.check_semantics(row, id_type_dict)
765 if invalid_semantics:
766 message = messages['m23']
767 table = {row_idx: invalid_semantics}
768 error = self.helper.create_error_dict(validation_level='semantics',
769 error_type='error',
770 message=message,
771 error_label='row_semantics',
772 located_in='field',
773 table=table)
774 jsonl_file.write(error)
776 # GET DUPLICATE BIBLIOGRAPHIC ENTITIES (LMDB-backed, no in-memory entity list needed)
777 duplicate_report = self.wellformed.get_duplicates_meta(
778 uf=uf, data_cache=duplicate_data_cache, messages=messages)
779 for error in duplicate_report:
780 jsonl_file.write(error)
782 logger.info("META-CSV validation complete, writing summary to '%s'", self.output_fp_txt)
784 # write human-readable validation summary to txt file
785 textual_report_stream= self.helper.create_validation_summary_stream(self.output_fp_json)
786 with open(self.output_fp_txt, 'w', encoding='utf-8') as f:
787 for l in textual_report_stream:
788 f.write(l)
790 is_valid = JSONLStreamIO(self.output_fp_json).is_empty()
791 logger.info("META-CSV validation result for '%s': %s", self.csv_doc, 'valid' if is_valid else 'invalid')
792 return is_valid
794 def validate_cits(self) -> bool:
795 """
796 Validates an instance of CITS-CSV using JSON-Lines streaming output
797 :return: True if the table is valid (i.e. no issues found), False otherwise.
798 """
799 logger.info("Validating CITS-CSV: '%s'", self.csv_doc)
800 messages = self.messages
802 # Set up Union-Find and cache for duplicate detection
803 if self.memory_efficient:
804 tmp_base = self._cache_dir or '.'
805 uf_tmp_dir = tempfile.mkdtemp(prefix='uf_dup_cits_', dir=tmp_base)
806 uf_env = lmdb.open(uf_tmp_dir, map_size=self.map_size, sync=False, metasync=False)
807 uf = LmdbUnionFind(uf_env)
808 else:
809 uf = InMemoryUnionFind()
810 uf_tmp_dir = None
811 uf_env = None
813 self._uf = uf
814 self._uf_tmp_dir = uf_tmp_dir
815 self._uf_env = uf_env
817 dup_cache_name = f'dup_cits_{abs(hash(self.csv_doc))}'
818 if self.memory_efficient:
819 duplicate_data_cache = LmdbCache(dup_cache_name, base_dir=self._cache_dir or '.', map_size=self.map_size)
820 else:
821 duplicate_data_cache = InMemoryCache(dup_cache_name)
822 duplicate_data_cache.open()
823 self.duplicate_data_cache = duplicate_data_cache
825 # Open JSON-L file for streaming output
826 with JSONLStreamIO(self.output_fp_json, 'a') as jsonl_file:
827 for row_idx, row in enumerate(tqdm(self.csv_stream.stream(), desc="Validating")):
828 # Recreate SPARQL client periodically to prevent memory growth
829 if row_idx > 0 and row_idx % 10000 == 0 and self.existence.use_meta_endpoint:
830 self.existence._recreate_sparql_client()
832 # Collect ID data for duplicate detection
833 citing_id = row.get('citing_id', '')
834 cited_id = row.get('cited_id', '')
835 duplicate_data_cache[str(row_idx)] = (citing_id, cited_id)
836 for id_value in (citing_id, cited_id):
837 if id_value:
838 items = id_value.split(' ')
839 non_empty = [i for i in items if i]
840 if non_empty:
841 uf.find(non_empty[0])
842 for _i in range(1, len(non_empty)):
843 uf.union(non_empty[0], non_empty[_i])
845 # Parse row into structured object
846 row_obj = read_citations_row(row)
848 for field, value in row.items():
849 if field == 'citing_id' or field == 'cited_id':
850 # Use structured object's parsed field
851 if field == 'citing_id':
852 items = row_obj.citing_id
853 else: # field == 'cited_id'
854 items = row_obj.cited_id
856 if not items: # Check required fields
857 message = messages['m7']
858 table = {row_idx: {field: None}}
859 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
860 error_type='error',
861 message=message,
862 error_label='required_value_cits',
863 located_in='field',
864 table=table)
865 jsonl_file.write(error)
866 else: # i.e. if string is not empty...
867 ids_set = set() # set where to put valid IDs only
869 for item_idx, item in enumerate(items):
871 if item == '':
872 message = messages['m1']
873 table = {row_idx: {field: [item_idx]}}
874 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
875 error_type='error',
876 message=message,
877 error_label='extra_space',
878 located_in='item',
879 table=table)
880 jsonl_file.write(error)
882 elif not self.wellformed.wellformedness_br_id(item):
883 message = messages['m2']
884 table = {row_idx: {field: [item_idx]}}
885 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
886 error_type='error',
887 message=message,
888 error_label='br_id_format',
889 located_in='item',
890 table=table)
891 jsonl_file.write(error)
893 else:
894 if item not in ids_set:
895 ids_set.add(item)
896 else: # in-field duplication of the same ID
898 table = {row_idx: {field: [i for i, v in enumerate(items) if v == item]}}
899 message = messages['m6']
901 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
902 error_type='error',
903 message=message,
904 error_label='duplicate_id',
905 located_in='item',
906 table=table) # 'valid'=False
907 jsonl_file.write(error)
908 # 2nd validation level: EXTERNAL SYNTAX OF ID (BIBLIOGRAPHIC RESOURCE)
909 if not self.syntax.check_id_syntax(item):
910 message = messages['m19']
911 table = {row_idx: {field: [item_idx]}}
912 error = self.helper.create_error_dict(validation_level='external_syntax',
913 error_type='error',
914 message=message,
915 error_label='br_id_syntax',
916 located_in='item',
917 table=table)
918 jsonl_file.write(error)
919 # 3rd validation level: EXISTENCE OF ID (BIBLIOGRAPHIC RESOURCE)
920 else:
921 if self.verify_id_existence: # if verify_id_existence is False just skip these operations
922 message = messages['m20']
923 table = {row_idx: {field: [item_idx]}}
924 if item not in self.id_cache:
925 if not self.existence.check_id_existence(item):
926 error = self.helper.create_error_dict(validation_level='existence',
927 error_type='warning',
928 message=message,
929 error_label='br_id_existence',
930 located_in='item',
931 table=table, valid=True)
932 jsonl_file.write(error)
933 self.id_cache[item] = False
934 else:
935 self.id_cache[item] = True
936 elif self.id_cache[item] is False:
937 error = self.helper.create_error_dict(validation_level='existence',
938 error_type='warning',
939 message=message,
940 error_label='br_id_existence',
941 located_in='item',
942 table=table, valid=True)
943 jsonl_file.write(error)
945 elif field == 'citing_publication_date' or field == 'cited_publication_date':
946 if value:
947 if not self.wellformed.wellformedness_date(value):
948 message = messages['m3']
949 table = {row_idx: {field: [0]}}
950 error = self.helper.create_error_dict(validation_level='csv_wellformedness',
951 error_type='error',
952 message=message,
953 error_label='date_format',
954 located_in='item',
955 table=table)
956 jsonl_file.write(error)
958 # GET SELF-CITATIONS AND DUPLICATE CITATIONS (LMDB-backed, no in-memory entity list needed)
959 duplicate_report = self.wellformed.get_duplicates_cits(
960 uf=uf, data_cache=duplicate_data_cache, messages=messages)
961 for error in duplicate_report:
962 jsonl_file.write(error)
964 logger.info("CITS-CSV validation complete, writing summary to '%s'", self.output_fp_txt)
966 # write human-readable validation summary to txt file
967 textual_report_stream= self.helper.create_validation_summary_stream(self.output_fp_json)
968 with open(self.output_fp_txt, "w", encoding='utf-8') as f:
969 for l in textual_report_stream:
970 f.write(l)
972 is_valid = JSONLStreamIO(self.output_fp_json).is_empty()
973 logger.info("CITS-CSV validation result for '%s': %s", self.csv_doc, 'valid' if is_valid else 'invalid')
974 return is_valid
977class ClosureValidator:
978 """
979 Validate a META-CSV and a CITS-CSV together, checking both their
980 individual correctness and the transitive closure between the two tables.
982 The closure check verifies that every entity referenced in citations has
983 corresponding metadata and vice versa.
984 """
986 def __init__(self, meta_in: str, meta_out_dir: str, cits_in: str, cits_out_dir: str,
987 strict_sequentiality: bool = False, meta_kwargs: Optional[dict] = None,
988 cits_kwargs: Optional[dict] = None, use_lmdb: bool = False,
989 map_size: int = 1 * 1024**3, cache_dir: Optional[str] = None,
990 verbose: bool = False, log_file: Optional[str] = None) -> None:
991 """
992 Initialise the ClosureValidator with META-CSV and CITS-CSV file paths.
994 Creates two internal :class:`Validator` instances and verifies that each
995 receives the expected table type.
997 :param meta_in: Path to the META-CSV file.
998 :type meta_in: str
999 :param meta_out_dir: Directory for META-CSV validation output.
1000 :type meta_out_dir: str
1001 :param cits_in: Path to the CITS-CSV file.
1002 :type cits_in: str
1003 :param cits_out_dir: Directory for CITS-CSV validation output.
1004 :type cits_out_dir: str
1005 :param strict_sequentiality: If ``True``, skip the closure check when
1006 the individual validations already report errors. Defaults to ``False``.
1007 :type strict_sequentiality: bool
1008 :param meta_kwargs: Extra keyword arguments forwarded to the META-CSV
1009 :class:`Validator`.
1010 :type meta_kwargs: Optional[dict]
1011 :param cits_kwargs: Extra keyword arguments forwarded to the CITS-CSV
1012 :class:`Validator`.
1013 :type cits_kwargs: Optional[dict]
1014 :param use_lmdb: If ``True``, use LMDB for caching (recommended for large files).
1015 :type use_lmdb: bool
1016 :param map_size: Maximum size in bytes for each LMDB environment (default 1 GB).
1017 :type map_size: int
1018 :param cache_dir: Optional base directory under which all LMDB caches are created.
1019 :type cache_dir: Optional[str]
1020 :param verbose: If ``True``, enable INFO-level logging output.
1021 :type verbose: bool
1022 :param log_file: If provided, write logs to this file instead of the terminal.
1023 :type log_file: Optional[str]
1024 :raises TableNotMatchingInstance: if either file is not of the expected table type.
1025 """
1026 self.meta_csv_doc = meta_in
1027 self.meta_output_dir = meta_out_dir
1028 self.cits_csv_doc = cits_in
1029 self.cits_output_dir = cits_out_dir
1030 self.strict_sequentiality = strict_sequentiality # if True, runs the check on transitive closure if and only if the other checks passed without errors
1031 self.verbose = verbose
1032 self.log_file = log_file
1033 configure_logging(verbose, log_file)
1034 logger.info("Initializing ClosureValidator: meta='%s', cits='%s'", meta_in, cits_in)
1036 script_dir = dirname(abspath(__file__)) # Directory where the script is located
1037 with open(join(script_dir, 'messages.yaml'), 'r', encoding='utf-8') as fm:
1038 self.messages = full_load(fm)
1040 # Define default kwargs for optional configuration of the two instances of Validator
1041 default_kwargs = {'use_meta_endpoint': False, 'verify_id_existence': True, 'use_lmdb': use_lmdb, 'map_size': map_size, 'cache_dir': cache_dir}
1043 # Merge user-provided kwargs with defaults
1044 meta_kwargs = {**default_kwargs, **(meta_kwargs or {})}
1045 cits_kwargs = {**default_kwargs, **(cits_kwargs or {})}
1047 # Propagate verbose and log_file to child validators
1048 meta_kwargs['verbose'] = verbose
1049 cits_kwargs['verbose'] = verbose
1050 meta_kwargs['log_file'] = log_file
1051 cits_kwargs['log_file'] = log_file
1053 # Create Validator instances with merged kwargs
1054 self.meta_validator = Validator(self.meta_csv_doc, self.meta_output_dir, **meta_kwargs)
1055 self.cits_validator = Validator(self.cits_csv_doc, self.cits_output_dir, **cits_kwargs)
1057 self.helper = Helper()
1058 self.memory_efficient = use_lmdb
1060 # Check if each of the two Validator instances is passed the expected table type
1061 if self.meta_validator.table_to_process != 'meta_csv':
1062 raise TableNotMatchingInstance(self.meta_csv_doc, self.meta_validator.table_to_process, 'meta_csv')
1063 if self.cits_validator.table_to_process != 'cits_csv':
1064 raise TableNotMatchingInstance(self.cits_csv_doc, self.cits_validator.table_to_process, 'cits_csv')
1066 def __enter__(self):
1067 """Context manager entry."""
1068 return self
1070 def __exit__(self, exc_type, exc_val, exc_tb):
1071 """Context manager exit - ensures caches are properly closed."""
1072 self.close()
1073 return False
1075 def close(self):
1076 """Close caches and clean up resources."""
1077 if hasattr(self, 'meta_validator'):
1078 self.meta_validator.close()
1079 if hasattr(self, 'cits_validator'):
1080 self.cits_validator.close()
1081 if hasattr(self, '_meta_positions_cache') and self._meta_positions_cache is not None:
1082 self._meta_positions_cache.close()
1083 self._meta_positions_cache = None
1084 if hasattr(self, '_cits_positions_cache') and self._cits_positions_cache is not None:
1085 self._cits_positions_cache.close()
1086 self._cits_positions_cache = None
1089 def check_closure(self) -> tuple[bool, bool]:
1090 """
1091 Check transitive closure between META-CSV and CITS-CSV.
1092 Reuses the Union-Finds populated during pass 1.
1093 Only position caches are built here (from the stored data caches).
1094 """
1095 print('Checking transitive closure between metadata and citations...')
1096 logger.info("Checking transitive closure between metadata and citations")
1097 meta_is_valid_closure = True
1098 cits_is_valid_closure = True
1100 # Reuse UFs and data caches from pass 1
1101 meta_uf = self.meta_validator._uf
1102 cits_uf = self.cits_validator._uf
1103 meta_cache = self.meta_validator.duplicate_data_cache
1104 cits_cache = self.cits_validator.duplicate_data_cache
1106 # Only position caches are created here
1107 if self.memory_efficient:
1108 cache_base = self.meta_validator._cache_dir or '.'
1109 self._meta_positions_cache = LmdbCache('closure_meta_positions', base_dir=cache_base, map_size=self.meta_validator.map_size)
1110 self._cits_positions_cache = LmdbCache('closure_cits_positions', base_dir=cache_base, map_size=self.cits_validator.map_size)
1111 else:
1112 self._meta_positions_cache = InMemoryCache('closure_meta_positions')
1113 self._cits_positions_cache = InMemoryCache('closure_cits_positions')
1114 self._meta_positions_cache.open()
1115 self._cits_positions_cache.open()
1117 try:
1118 # --- Build position cache from META data cache ---
1119 for str_idx, id_value in meta_cache.items():
1120 row_idx = int(str_idx)
1121 if id_value:
1122 ids = id_value.split(' ')
1123 ids_unique = list(set(i for i in ids if i))
1124 if not ids_unique:
1125 continue
1126 pos_entry = {row_idx: {'id': list(range(len(ids)))}}
1127 for item in ids_unique:
1128 existing = self._meta_positions_cache.get(item)
1129 if existing is None:
1130 self._meta_positions_cache[item] = [pos_entry]
1131 else:
1132 existing.append(pos_entry)
1133 self._meta_positions_cache[item] = existing
1135 # --- Build position cache from CITS data cache ---
1136 for str_idx, (citing_id_str, cited_id_str) in cits_cache.items():
1137 row_idx = int(str_idx)
1138 for id_value, field_name in (
1139 (citing_id_str, 'citing_id'),
1140 (cited_id_str, 'cited_id'),
1141 ):
1142 if id_value:
1143 ids = id_value.split(' ')
1144 ids_unique = list(set(i for i in ids if i))
1145 if not ids_unique:
1146 continue
1147 pos_entry = {row_idx: {field_name: list(range(len(ids)))}}
1148 for item in ids_unique:
1149 existing = self._cits_positions_cache.get(item)
1150 if existing is None:
1151 self._cits_positions_cache[item] = [pos_entry]
1152 else:
1153 existing.append(pos_entry)
1154 self._cits_positions_cache[item] = existing
1156 # --- Check META entities that have no citations ---
1157 # An entity is "missing citations" when ALL of its IDs are absent from cits_positions_cache.
1158 # We check membership directly in LMDB (O(1) per lookup) — no large Python sets needed.
1159 with JSONLStreamIO(self.meta_validator.output_fp_json, 'a') as meta_json_file:
1160 for _root, br_ids_set in meta_uf.iter_components():
1161 if all(id_ not in self._cits_positions_cache for id_ in br_ids_set):
1162 table: dict = {}
1163 for id_ in br_ids_set:
1164 for pos_dict in (self._meta_positions_cache.get(id_) or []):
1165 table.update(pos_dict)
1166 if table:
1167 meta_json_file.write(
1168 self.helper.create_error_dict(
1169 validation_level='csv_wellformedness',
1170 error_type='error',
1171 message=self.messages['m24'],
1172 error_label='missing_citations',
1173 located_in='row',
1174 table=table,
1175 )
1176 )
1177 meta_is_valid_closure = False
1179 # --- Check CITS entities that have no metadata ---
1180 with JSONLStreamIO(self.cits_validator.output_fp_json, 'a') as cits_json_file:
1181 for _root, br_ids_set in cits_uf.iter_components():
1182 if all(id_ not in self._meta_positions_cache for id_ in br_ids_set):
1183 table = {}
1184 for id_ in br_ids_set:
1185 for pos_dict in (self._cits_positions_cache.get(id_) or []):
1186 table.update(pos_dict)
1187 if table:
1188 cits_json_file.write(
1189 self.helper.create_error_dict(
1190 validation_level='csv_wellformedness',
1191 error_type='error',
1192 message=self.messages['m25'],
1193 error_label='missing_metadata',
1194 located_in='row',
1195 table=table,
1196 )
1197 )
1198 cits_is_valid_closure = False
1200 finally:
1201 self._meta_positions_cache.close()
1202 self._cits_positions_cache.close()
1204 # Write human-readable validation summaries for both tables
1205 textual_report_stream_meta = self.helper.create_validation_summary_stream(self.meta_validator.output_fp_json)
1206 textual_report_stream_cits = self.helper.create_validation_summary_stream(self.cits_validator.output_fp_json)
1208 with open(self.meta_validator.output_fp_txt, "w", encoding='utf-8') as fm:
1209 for lm in textual_report_stream_meta:
1210 fm.write(lm)
1211 with open(self.cits_validator.output_fp_txt, "w", encoding='utf-8') as fc:
1212 for lc in textual_report_stream_cits:
1213 fc.write(lc)
1215 logger.info("Closure check complete: meta_valid=%s, cits_valid=%s", meta_is_valid_closure, cits_is_valid_closure)
1217 return (meta_is_valid_closure, cits_is_valid_closure)
1220 def validate(self) -> tuple[bool, bool]:
1221 """
1222 Run the full validation pipeline on both META-CSV and CITS-CSV.
1224 First validates each table individually, then (unless
1225 ``strict_sequentiality`` is ``True`` and errors were found) checks the
1226 transitive closure between the two.
1228 :return: A two-element tuple ``(meta_is_valid, cits_is_valid)``.
1229 :rtype: tuple[bool, bool]
1230 """
1231 try:
1232 # Run single validation for META-CSV and CITS-CSV
1233 logger.info("Running individual validation of META-CSV and CITS-CSV")
1234 meta_is_valid = self.meta_validator.validate()
1235 cits_is_valid = self.cits_validator.validate()
1236 logger.info("Individual validation complete: meta_valid=%s, cits_valid=%s", meta_is_valid, cits_is_valid)
1238 # in case some errors have already been found and strict_sequentiality is True, don't run the check on closure
1239 if self.strict_sequentiality:
1240 if not meta_is_valid or not cits_is_valid:
1241 print('The separate validation of the metadata (META-CSV) and citations (CITS-CSV) tables already detected some error (in one or both documents).')
1242 print('Skipping the check of transitive closure as strict_sequentiality==True.')
1243 logger.info("Skipping closure check due to strict_sequentiality: meta_valid=%s, cits_valid=%s", meta_is_valid, cits_is_valid)
1244 return (meta_is_valid, cits_is_valid)
1246 # Run validation for transitive closure
1247 meta_is_valid_closure, cits_is_valid_closure = self.check_closure()
1249 final_meta = bool(meta_is_valid_closure and meta_is_valid)
1250 final_cits = bool(cits_is_valid_closure and cits_is_valid)
1251 logger.info("ClosureValidator final result: meta_valid=%s, cits_valid=%s", final_meta, final_cits)
1252 return (final_meta, final_cits)
1253 finally:
1254 logger.info("ClosureValidator process finished. Cleaning up resources...")
1255 self.close()
1256 logger.info("ClosureValidator resources cleaned up.")
1259if __name__ == '__main__':
1260 parser = ArgumentParser()
1261 parser.add_argument('-i', '--input', dest='input_csv', required=True,
1262 help='The path to the CSV document to validate.', type=str)
1263 parser.add_argument('-o', '--output', dest='output_dir', required=True,
1264 help='The path to the directory where to store the output JSON-L file.', type=str)
1265 parser.add_argument('-m', '--use-meta', dest='use_meta_endpoint', action='store_true',
1266 help='Use the OC Meta endpoint to check if an ID exists.', required=False)
1267 parser.add_argument('-s', '--no-id-existence', dest='verify_id_existence', action='store_false',
1268 help='Skip checking if IDs are registered somewhere, i.e. do not use Meta endpoint nor external APIs.',
1269 required=False)
1270 parser.add_argument('--use-lmdb', dest='use_lmdb', action='store_true',
1271 default=False,
1272 help='Enable LMDB for efficient memory usage with large files (default: True).')
1273 parser.add_argument('--map-size', dest='map_size', type=int, default=1,
1274 help='LMDB map size in GiB (default: 1).',
1275 required=False)
1276 parser.add_argument('--cache-dir', dest='cache_dir', type=str, default=None,
1277 help='Base directory under which all LMDB caches are created.',
1278 required=False)
1279 parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
1280 default=False,
1281 help='Enable verbose logging output.')
1282 parser.add_argument('--log-file', dest='log_file', type=str, default=None,
1283 help='Write logs to this file instead of the terminal.',
1284 required=False)
1285 args = parser.parse_args()
1286 v = Validator(
1287 args.input_csv,
1288 args.output_dir,
1289 use_meta_endpoint=args.use_meta_endpoint,
1290 verify_id_existence=args.verify_id_existence,
1291 use_lmdb=args.use_lmdb,
1292 map_size=args.map_size * 1024**3,
1293 cache_dir=args.cache_dir,
1294 verbose=args.verbose,
1295 log_file=args.log_file,
1296 )
1297 v.validate()
1299# to instantiate the class, write:
1300# v = Validator('path/to/csv/file', 'output/dir/path') # optionally set use_meta_endpoint to True and/or verify_id_existence to False
1301# v.validate() --> validates, returns the output, and saves files
1304# FROM THE COMMAND LINE:
1305# python -m oc_validator.main -i <input csv file path> -o <output dir path> [-m] [-s] [--use-lmdb [--cache-dir <dir>] [--map-size <GiB>]]