Coverage for oc_validator / main.py: 75%

660 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 15:46 +0000

1# ISC License 

2# 

3# Copyright (c) 2023-2026, Elia Rizzetto, Silvio Peroni 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any 

6# purpose with or without fee is hereby granted, provided that the above 

7# copyright notice and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, 

12# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 

13# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR 

14# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 

15# PERFORMANCE OF THIS SOFTWARE. 

16 

17from csv import DictReader, field_size_limit 

18from yaml import full_load 

19from json import load, dumps 

20from os.path import exists, join, dirname, abspath 

21from os import makedirs, getcwd 

22from re import finditer 

23import tempfile 

24import shutil 

25import lmdb 

26from typing import Optional 

27from oc_validator.helper import Helper, read_csv, CSVStreamReader, JSONLStreamIO 

28from oc_validator.csv_wellformedness import Wellformedness 

29from oc_validator.id_syntax import IdSyntax 

30from oc_validator.id_existence import IdExistence 

31from oc_validator.semantics import Semantics 

32from oc_validator.table_reader import read_metadata_row, read_citations_row 

33from oc_validator.lmdb_cache import LmdbCache, InMemoryCache, LmdbUnionFind, InMemoryUnionFind, UnionFind 

34from tqdm import tqdm 

35from argparse import ArgumentParser 

36from oc_validator import configure_logging, logger 

37from time import time 

38 

39 

40# --- Custom Exception classes. --- 

41class ValidationError(Exception): 

42 """Base class for errors related to the validation process.""" 

43 pass 

44 

45class InvalidTableError(ValidationError): 

46 """Raised when the submitted table cannot be identified as META-CSV or CITS-CSV, therefore cannot be processed.""" 

47 def __init__(self, input_fp): 

48 super().__init__('The submitted table does not meet the required basic formatting standards. ' 

49 'Please ensure that both the metadata and citations tables are valid CSV files following the correct structure: ' 

50 'the metadata table must have the following columns: "id", "title", "author", "pub_date", "venue", "volume", "issue", "page", "type", "publisher", "editor"; ' 

51 'the citations table must have either 4 columns ("citing_id", "citing_publication_date", "cited_id", "cited_publication_date") or two columns ("citing_id","cited_id")' 

52 'Refer to the documentation at https://github.com/opencitations/crowdsourcing/blob/main/README.md for the expected format and structure before resubmitting your deposit.') 

53 self.input_fp = input_fp 

54 

55class TableNotMatchingInstance(ValidationError): 

56 """Raised when the table submitted for a specific Validator instance in ClosureValidator does not match the process validation type, 

57 e.g. a CITS-CSV table is submitted for an instance of Validator that is intended to process a META-CSV table. 

58 """ 

59 def __init__(self, input_fp, detected_table_type, correct_table_type): 

60 super().__init__(f'The submitted table in file "{input_fp}" is of type {detected_table_type}, but should be of type {correct_table_type} instead.') 

61 self.input_fp = input_fp 

62 self.detected_table_type = detected_table_type 

63 self.correct_table_type = correct_table_type 

64 

65# --- Class for the main process; validates one document at a time via the Validator.validate() method. --- 

66class Validator: 

67 def __init__(self, csv_doc: str, output_dir: str, use_meta_endpoint=False, verify_id_existence=True, 

68 use_lmdb=False, map_size: int = 1 * 1024**3, cache_dir: str = None, verbose: bool = False, 

69 log_file: str = None): 

70 """ 

71 Initialize the Validator. 

72 

73 :param csv_doc: Path to the CSV file to validate 

74 :param output_dir: Directory to store validation output 

75 :param use_meta_endpoint: Whether to use OC Meta endpoint for ID existence checks 

76 :param verify_id_existence: Whether to verify ID existence 

77 :param use_lmdb: If True, use LMDB for caching (recommended for large files) 

78 :param map_size: Maximum size in bytes for each LMDB environment (default 1 GB) 

79 :param cache_dir: Optional base directory under which all LMDB caches are created 

80 :param verbose: If True, enable DEBUG-level logging output 

81 :param log_file: If provided, write logs to this file instead of the terminal 

82 """ 

83 self.csv_doc = csv_doc 

84 self.verbose = verbose 

85 configure_logging(verbose, log_file) 

86 logger.debug("Initializing Validator for '%s' (output: '%s')", csv_doc, output_dir) 

87 self.csv_stream = CSVStreamReader(csv_doc) # Use streaming instead of loading all data 

88 self.table_to_process = self.process_selector() 

89 self.helper = Helper() 

90 self.wellformed = Wellformedness() 

91 self.syntax = IdSyntax() 

92 self.existence = IdExistence(use_meta_endpoint=use_meta_endpoint) 

93 self.semantics = Semantics() 

94 script_dir = dirname(abspath(__file__)) # Directory where the script is located 

95 with open(join(script_dir, 'messages.yaml'), 'r', encoding='utf-8') as fm: 

96 self.messages = full_load(fm) 

97 with open(join(script_dir, 'id_type_alignment.json'), 'r', encoding='utf-8') as fa: 

98 self.id_type_dict = load(fa) 

99 self.output_dir = output_dir 

100 if not exists(self.output_dir): 

101 makedirs(self.output_dir) 

102 if self.table_to_process == 'meta_csv': 

103 self.output_fp_json = self._make_output_filepath('out_validate_meta', 'jsonl') 

104 self.output_fp_txt = self._make_output_filepath('meta_validation_summary', 'txt') 

105 elif self.table_to_process == 'cits_csv': 

106 self.output_fp_json = self._make_output_filepath('out_validate_cits', 'jsonl') 

107 self.output_fp_txt = self._make_output_filepath('cits_validation_summary', 'txt') 

108 

109 logger.debug("Detected table type: %s", self.table_to_process) 

110 logger.debug("Output files: jsonl='%s', txt='%s'", self.output_fp_json, self.output_fp_txt) 

111 

112 # Initialize cache based on memory_efficient flag 

113 self.memory_efficient = use_lmdb 

114 self.map_size = map_size 

115 self._cache_dir = cache_dir 

116 

117 cache_name = f'validator_{hash(csv_doc)}' 

118 if use_lmdb: 

119 self.id_cache = LmdbCache(cache_name, base_dir=self._cache_dir or '.', map_size=self.map_size) 

120 else: 

121 self.id_cache = InMemoryCache(cache_name) 

122 

123 logger.info("Cache type: %s", 'LMDB' if use_lmdb else 'in-memory') 

124 

125 # Open the cache 

126 self.id_cache.open() 

127 

128 self.verify_id_existence = verify_id_existence 

129 self._uf = None 

130 self._uf_env = None 

131 self._uf_tmp_dir = None 

132 self.duplicate_data_cache = None 

133 

134 def __enter__(self): 

135 """Context manager entry.""" 

136 return self 

137 

138 def __exit__(self, exc_type, exc_val, exc_tb): 

139 """Context manager exit - ensures cache is properly closed.""" 

140 self.close() 

141 return False 

142 

143 def close(self): 

144 """Close the cache and clean up resources.""" 

145 if hasattr(self, 'id_cache') and self.id_cache is not None: 

146 self.id_cache.close() 

147 if hasattr(self, '_uf_env') and self._uf_env is not None: 

148 self._uf_env.close() 

149 self._uf_env = None 

150 if hasattr(self, '_uf_tmp_dir') and self._uf_tmp_dir is not None: 

151 shutil.rmtree(self._uf_tmp_dir, ignore_errors=True) 

152 self._uf_tmp_dir = None 

153 if hasattr(self, 'duplicate_data_cache') and self.duplicate_data_cache is not None: 

154 self.duplicate_data_cache.close() 

155 self.duplicate_data_cache = None 

156 

157 def process_selector(self) -> str: 

158 """ 

159 Detect the table type by streaming the first few rows. 

160 

161 Reads up to 10 rows to determine whether the CSV is a META-CSV or a 

162 CITS-CSV, then returns the corresponding identifier string. 

163 

164 :return: ``'meta_csv'`` or ``'cits_csv'``. 

165 :rtype: str 

166 :raises InvalidTableError: if the table structure cannot be recognised. 

167 """ 

168 # Read first few rows to determine table type 

169 sample_rows = [] 

170 for i, row in enumerate(self.csv_stream): 

171 if i >= 10: # Only need first 10 rows to determine type 

172 break 

173 sample_rows.append(row) 

174 

175 if not sample_rows: 

176 raise InvalidTableError(self.csv_doc) 

177 

178 process_type = None 

179 try: 

180 if all(set(row.keys()) == {"id", "title", "author", "pub_date", "venue", "volume", "issue", "page", "type", 

181 "publisher", "editor"} for row in sample_rows): 

182 process_type = 'meta_csv' 

183 return process_type 

184 elif all(set(row.keys()) == {'citing_id', 'citing_publication_date', 'cited_id', 'cited_publication_date'} for row in sample_rows): 

185 process_type = 'cits_csv' 

186 return process_type 

187 elif all(set(row.keys()) == {'citing_id', 'cited_id'} for row in sample_rows): # support also Index tables with no publication dates 

188 process_type = 'cits_csv' 

189 return process_type 

190 else: 

191 raise InvalidTableError(self.csv_doc) 

192 except KeyError: 

193 raise InvalidTableError(self.csv_doc) 

194 

195 def _make_output_filepath(self, base_filename: str, extension: str) -> str: 

196 """ 

197 Generate a unique output filepath. 

198 

199 If a file with the base name already exists in the output directory, 

200 an incrementing counter is appended to the filename. 

201 

202 :param base_filename: Base name for the output file (without extension). 

203 :type base_filename: str 

204 :param extension: File extension (e.g. ``'jsonl'``, ``'txt'``). 

205 :type extension: str 

206 :return: Absolute path to a non-existing output file. 

207 :rtype: str 

208 """ 

209 

210 full_path = join(self.output_dir, f"{base_filename}.{extension}") 

211 counter = 1 

212 

213 # If filepath already exists, increment the counter and check for existing files 

214 while exists(full_path): 

215 full_path = join(self.output_dir, f"{base_filename}_{counter}.{extension}") 

216 counter += 1 

217 

218 return full_path 

219 

220 def validate(self) -> bool: 

221 """ 

222 Run the full validation pipeline on the input CSV document. 

223 

224 Dispatches to :meth:`validate_meta` or :meth:`validate_cits` depending 

225 on the detected table type. 

226 

227 :return: ``True`` if the table is valid (no issues found), ``False`` otherwise. 

228 :rtype: bool 

229 """ 

230 logger.info("Starting validation of '%s'", self.csv_doc) 

231 try: 

232 start = time() 

233 if self.table_to_process == 'meta_csv': 

234 result = self.validate_meta() 

235 elif self.table_to_process == 'cits_csv': 

236 result = self.validate_cits() 

237 logger.info("Validation of '%s' complete. Valid: %s", self.csv_doc, result) 

238 return result 

239 finally: 

240 logger.info(f"Cleaning up resources for {self.table_to_process} table...") 

241 self.existence.close() 

242 if self.id_cache._is_open: 

243 self.id_cache.close() 

244 logger.info(f"Process finished in {(time() - start)/60:.2f} minutes.") 

245 

246 

247 def validate_meta(self) -> bool: 

248 """ 

249 Validate an instance of META-CSV using JSON-Lines streaming output 

250 :return: True if the table is valid (i.e. no issues found), False otherwise. 

251 """ 

252 logger.info("Validating META-CSV: '%s'", self.csv_doc) 

253 messages = self.messages 

254 id_type_dict = self.id_type_dict 

255 

256 # Set up Union-Find and cache for duplicate detection 

257 # NOTE: if self.memory_efficient is True, these open LMDB envs which must be 

258 # closed (deleting related dir) via self.close() 

259 if self.memory_efficient: 

260 tmp_base = self._cache_dir or '.' 

261 uf_tmp_dir = tempfile.mkdtemp(prefix='uf_dup_meta_', dir=tmp_base) 

262 uf_env = lmdb.open(uf_tmp_dir, map_size=self.map_size, sync=False, metasync=False) 

263 uf = LmdbUnionFind(uf_env) 

264 else: 

265 uf = InMemoryUnionFind() 

266 uf_tmp_dir = None 

267 uf_env = None 

268 

269 self._uf = uf 

270 self._uf_tmp_dir = uf_tmp_dir 

271 self._uf_env = uf_env 

272 

273 dup_cache_name = f'dup_meta_{abs(hash(self.csv_doc))}' 

274 if self.memory_efficient: 

275 duplicate_data_cache = LmdbCache(dup_cache_name, base_dir=self._cache_dir or '.', map_size=self.map_size) 

276 else: 

277 duplicate_data_cache = InMemoryCache(dup_cache_name) 

278 duplicate_data_cache.open() 

279 self.duplicate_data_cache = duplicate_data_cache 

280 

281 # Open JSON-L file for streaming output 

282 with JSONLStreamIO(self.output_fp_json, 'a') as jsonl_file: 

283 for row_idx, row in enumerate(tqdm(self.csv_stream.stream(), desc="Validating")): 

284 # Recreate SPARQL client periodically to prevent memory growth 

285 if row_idx > 0 and row_idx % 10000 == 0 and self.existence.use_meta_endpoint: 

286 self.existence._recreate_sparql_client() 

287 

288 row_ok = True # switch for row well-formedness 

289 id_ok = True # switch for id field well-formedness 

290 type_ok = True # switch for type field well-formedness 

291 

292 # Collect ID data for duplicate detection 

293 id_value = row.get('id', '') 

294 duplicate_data_cache[str(row_idx)] = id_value 

295 if id_value: 

296 items = id_value.split(' ') 

297 non_empty = [i for i in items if i] 

298 if non_empty: 

299 uf.find(non_empty[0]) 

300 for _i in range(1, len(non_empty)): 

301 uf.union(non_empty[0], non_empty[_i]) 

302 

303 missing_required_fields = self.wellformed.get_missing_values( 

304 row) # dict w/ positions of error in row; empty if row is fine 

305 if missing_required_fields: 

306 message = messages['m17'] 

307 table = {row_idx: missing_required_fields} 

308 error = self.helper.create_error_dict( 

309 validation_level='csv_wellformedness', 

310 error_type='error', 

311 message=message, 

312 error_label='required_fields', 

313 located_in='field', 

314 table=table) 

315 jsonl_file.write(error) 

316 row_ok = False 

317 

318 # Parse row into structured object 

319 row_obj = read_metadata_row(row) 

320 

321 for field, value in row.items(): 

322 

323 if field == 'id': 

324 # Use structured object's parsed id field 

325 items = row_obj.id 

326 if items: 

327 br_ids_set = set() # set where to put well-formed br IDs only 

328 

329 for item_idx, item in enumerate(items): 

330 

331 if item == '': 

332 message = messages['m1'] 

333 table = {row_idx: {field: [item_idx]}} 

334 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

335 error_type='error', 

336 message=message, 

337 error_label='extra_space', 

338 located_in='item', 

339 table=table) 

340 jsonl_file.write(error) 

341 

342 elif not self.wellformed.wellformedness_br_id(item): 

343 message = messages['m2'] 

344 table = {row_idx: {field: [item_idx]}} 

345 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

346 error_type='error', 

347 message=message, 

348 error_label='br_id_format', 

349 located_in='item', 

350 table=table) 

351 jsonl_file.write(error) 

352 

353 else: 

354 if item not in br_ids_set: 

355 br_ids_set.add(item) 

356 else: # in-field duplication of the same ID 

357 table = {row_idx: {field: [i for i, v in enumerate(items) if v == item]}} 

358 message = messages['m6'] 

359 

360 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

361 error_type='error', 

362 message=message, 

363 error_label='duplicate_id', 

364 located_in='item', 

365 table=table) # valid=False 

366 jsonl_file.write(error) 

367 

368 # 2nd validation level: EXTERNAL SYNTAX OF ID (BIBLIOGRAPHIC RESOURCE) 

369 if not self.syntax.check_id_syntax(item): 

370 message = messages['m19'] 

371 table = {row_idx: {field: [item_idx]}} 

372 error = self.helper.create_error_dict(validation_level='external_syntax', 

373 error_type='error', 

374 message=message, 

375 error_label='br_id_syntax', 

376 located_in='item', 

377 table=table) 

378 jsonl_file.write(error) 

379 # 3rd validation level: EXISTENCE OF ID (BIBLIOGRAPHIC RESOURCE) 

380 else: 

381 if self.verify_id_existence: # if verify_id_existence is False just skip these operations 

382 message = messages['m20'] 

383 table = {row_idx: {field: [item_idx]}} 

384 if item not in self.id_cache: 

385 if not self.existence.check_id_existence(item): 

386 error = self.helper.create_error_dict(validation_level='existence', 

387 error_type='warning', 

388 message=message, 

389 error_label='br_id_existence', 

390 located_in='item', 

391 table=table, valid=True) 

392 jsonl_file.write(error) 

393 self.id_cache[item] = False 

394 else: 

395 self.id_cache[item] = True 

396 elif self.id_cache[item] is False: 

397 error = self.helper.create_error_dict(validation_level='existence', 

398 error_type='warning', 

399 message=message, 

400 error_label='br_id_existence', 

401 located_in='item', 

402 table=table, valid=True) 

403 jsonl_file.write(error) 

404 

405 if len(br_ids_set) != len(items): # --> some well-formedness error occurred in the id field 

406 id_ok = False 

407 

408 elif field == 'title': 

409 if value: 

410 if value.isupper(): 

411 message = messages['m8'] 

412 table = {row_idx: {field: [0]}} 

413 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

414 error_type='warning', 

415 message=message, 

416 error_label='uppercase_title', 

417 located_in='item', 

418 table=table, 

419 valid=True) 

420 jsonl_file.write(error) 

421 

422 elif field == 'author' or field == 'editor': 

423 # Use structured object's parsed field 

424 if field == 'author': 

425 agents = row_obj.author 

426 else: # field == 'editor' 

427 agents = row_obj.editor 

428 

429 if agents: 

430 items = agents # Already parsed list of AgentItem objects 

431 

432 # Check in-field duplication based on shared RA IDs 

433 dup_groups = self.wellformed.check_duplicate_ra_by_id(items) 

434 for dup_indices in dup_groups: 

435 table = {row_idx: {field: dup_indices}} 

436 message = messages['m26'] 

437 

438 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

439 error_type='error', 

440 message=message, 

441 error_label='duplicate_ra', 

442 located_in='item', 

443 table=table) 

444 jsonl_file.write(error) 

445 

446 for item_idx, item in enumerate(items): 

447 # Check orphan RA ID using the raw string 

448 if self.wellformed.orphan_ra_id(item._raw): 

449 message = messages['m10'] 

450 table = {row_idx: {field: [item_idx]}} 

451 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

452 error_type='warning', 

453 message=message, 

454 error_label='orphan_ra_id', 

455 located_in='item', 

456 table=table, 

457 valid=True) 

458 jsonl_file.write(error) 

459 

460 # Validate using the raw string 

461 if not self.wellformed.wellformedness_people_item(item._raw): 

462 message = messages['m9'] 

463 table = {row_idx: {field: [item_idx]}} 

464 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

465 error_type='error', 

466 message=message, 

467 error_label='people_item_format', 

468 located_in='item', 

469 table=table) 

470 jsonl_file.write(error) 

471 

472 else: 

473 

474 # Use structured object's ids attribute 

475 ids = item.ids 

476 

477 for id in ids: 

478 # 2nd validation level: EXTERNAL SYNTAX OF ID (RESPONSIBLE AGENT) 

479 if not self.syntax.check_id_syntax(id): 

480 message = messages['m21'] 

481 table = {row_idx: {field: [item_idx]}} 

482 error = self.helper.create_error_dict(validation_level='external_syntax', 

483 error_type='error', 

484 message=message, 

485 error_label='ra_id_syntax', 

486 located_in='item', 

487 table=table) 

488 jsonl_file.write(error) 

489 # 3rd validation level: EXISTENCE OF ID (RESPONSIBLE AGENT) 

490 else: 

491 if self.verify_id_existence: # if verify_id_existence is False just skip these operations 

492 message = messages['m22'] 

493 table = {row_idx: {field: [item_idx]}} 

494 if id not in self.id_cache: 

495 if not self.existence.check_id_existence(id): 

496 error = self.helper.create_error_dict(validation_level='existence', 

497 error_type='warning', 

498 message=message, 

499 error_label='ra_id_existence', 

500 located_in='item', 

501 table=table, 

502 valid=True) 

503 jsonl_file.write(error) 

504 self.id_cache[id] = False 

505 else: 

506 self.id_cache[id] = True 

507 elif self.id_cache[id] is False: 

508 error = self.helper.create_error_dict(validation_level='existence', 

509 error_type='warning', 

510 message=message, 

511 error_label='ra_id_existence', 

512 located_in='item', 

513 table=table, 

514 valid=True) 

515 jsonl_file.write(error) 

516 elif field == 'pub_date': 

517 if value: 

518 if not self.wellformed.wellformedness_date(value): 

519 message = messages['m3'] 

520 table = {row_idx: {field: [0]}} 

521 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

522 error_type='error', 

523 message=message, 

524 error_label='date_format', 

525 located_in='item', 

526 table=table) 

527 jsonl_file.write(error) 

528 

529 elif field == 'venue': 

530 # Use structured object's parsed field 

531 venue = row_obj.venue 

532 if venue: 

533 

534 # Check orphan venue ID using the raw string 

535 if self.wellformed.orphan_venue_id(venue._raw): 

536 message = messages['m15'] 

537 table = {row_idx: {field: [0]}} 

538 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

539 error_type='warning', 

540 message=message, 

541 error_label='orphan_venue_id', 

542 located_in='item', 

543 table=table, 

544 valid=True) 

545 jsonl_file.write(error) 

546 

547 # Validate using the raw string 

548 if not self.wellformed.wellformedness_venue(venue._raw): 

549 message = messages['m12'] 

550 table = {row_idx: {field: [0]}} 

551 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

552 error_type='error', 

553 message=message, 

554 error_label='venue_format', 

555 located_in='item', 

556 table=table) 

557 jsonl_file.write(error) 

558 

559 else: 

560 # Use structured object's ids attribute 

561 ids = venue.ids 

562 

563 for id in ids: 

564 

565 # 2nd validation level: EXTERNAL SYNTAX OF ID (BIBLIOGRAPHIC RESOURCE) 

566 if not self.syntax.check_id_syntax(id): 

567 message = messages['m19'] 

568 table = {row_idx: {field: [0]}} 

569 error = self.helper.create_error_dict(validation_level='external_syntax', 

570 error_type='error', 

571 message=message, 

572 error_label='br_id_syntax', 

573 located_in='item', 

574 table=table) 

575 jsonl_file.write(error) 

576 # 3rd validation level: EXISTENCE OF ID (BIBLIOGRAPHIC RESOURCE) 

577 else: 

578 if self.verify_id_existence: # if verify_id_existence is False just skip these operations 

579 message = messages['m20'] 

580 table = {row_idx: {field: [0]}} 

581 if id not in self.id_cache: 

582 if not self.existence.check_id_existence(id): 

583 error = self.helper.create_error_dict(validation_level='existence', 

584 error_type='warning', 

585 message=message, 

586 error_label='br_id_existence', 

587 located_in='item', 

588 table=table, 

589 valid=True) 

590 jsonl_file.write(error) 

591 self.id_cache[id] = False 

592 else: 

593 self.id_cache[id] = True 

594 elif self.id_cache[id] is False: 

595 error = self.helper.create_error_dict(validation_level='existence', 

596 error_type='warning', 

597 message=message, 

598 error_label='br_id_existence', 

599 located_in='item', 

600 table=table, 

601 valid=True) 

602 jsonl_file.write(error) 

603 

604 elif field == 'volume': 

605 if value: 

606 if not self.wellformed.wellformedness_volume_issue(value): 

607 message = messages['m13'] 

608 table = {row_idx: {field: [0]}} 

609 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

610 error_type='error', 

611 message=message, 

612 error_label='volume_issue_format', 

613 located_in='item', 

614 table=table) 

615 jsonl_file.write(error) 

616 

617 elif field == 'issue': 

618 if value: 

619 if not self.wellformed.wellformedness_volume_issue(value): 

620 message = messages['m13'] 

621 table = {row_idx: {field: [0]}} 

622 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

623 error_type='error', 

624 message=message, 

625 error_label='volume_issue_format', 

626 located_in='item', 

627 table=table) 

628 jsonl_file.write(error) 

629 

630 elif field == 'page': 

631 if value: 

632 if not self.wellformed.wellformedness_page(value): 

633 message = messages['m14'] 

634 table = {row_idx: {field: [0]}} 

635 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

636 error_type='error', 

637 message=message, 

638 error_label='page_format', 

639 located_in='item', 

640 table=table) 

641 jsonl_file.write(error) 

642 else: 

643 if not self.wellformed.check_page_interval(value): 

644 message = messages['m18'] 

645 table = {row_idx: {field: [0]}} 

646 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

647 error_type='warning', 

648 message=message, 

649 error_label='page_interval', 

650 located_in='item', 

651 table=table, 

652 valid=True) 

653 jsonl_file.write(error) 

654 

655 elif field == 'type': 

656 if value: 

657 if not self.wellformed.wellformedness_type(value): 

658 message = messages['m16'] 

659 table = {row_idx: {field: [0]}} 

660 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

661 error_type='error', 

662 message=message, 

663 error_label='type_format', 

664 located_in='item', 

665 table=table) 

666 jsonl_file.write(error) 

667 

668 type_ok = False 

669 

670 elif field == 'publisher': 

671 # Use structured object's parsed field 

672 publishers = row_obj.publisher 

673 if publishers: 

674 items = publishers # Already parsed list of AgentItem objects 

675 

676 # Check in-field duplication based on raw string exact match 

677 dup_groups = self.wellformed.check_duplicate_publisher_by_raw(items) 

678 for dup_indices in dup_groups: 

679 table = {row_idx: {field: dup_indices}} 

680 message = messages['m26'] 

681 

682 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

683 error_type='error', 

684 message=message, 

685 error_label='duplicate_ra', 

686 located_in='item', 

687 table=table) 

688 jsonl_file.write(error) 

689 

690 for item_idx, item in enumerate(items): 

691 # Check orphan RA ID using the raw string 

692 if self.wellformed.orphan_ra_id(item._raw): 

693 message = messages['m10'] 

694 table = {row_idx: {field: [item_idx]}} 

695 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

696 error_type='warning', 

697 message=message, 

698 error_label='orphan_ra_id', 

699 located_in='item', 

700 table=table, 

701 valid=True) 

702 jsonl_file.write(error) 

703 

704 # Validate using the raw string 

705 if not self.wellformed.wellformedness_publisher_item(item._raw): 

706 message = messages['m9'] 

707 table = {row_idx: {field: [item_idx]}} 

708 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

709 error_type='error', 

710 message=message, 

711 error_label='publisher_format', 

712 located_in='item', 

713 table=table) 

714 jsonl_file.write(error) 

715 

716 else: 

717 

718 # Use structured object's ids attribute 

719 ids = item.ids 

720 

721 for id in ids: 

722 

723 # 2nd validation level: EXTERNAL SYNTAX OF ID (RESPONSIBLE AGENT) 

724 if not self.syntax.check_id_syntax(id): 

725 message = messages['m21'] 

726 table = {row_idx: {field: [item_idx]}} 

727 error = self.helper.create_error_dict(validation_level='external_syntax', 

728 error_type='error', 

729 message=message, 

730 error_label='ra_id_syntax', 

731 located_in='item', 

732 table=table) 

733 jsonl_file.write(error) 

734 # 3rd validation level: EXISTENCE OF ID (RESPONSIBLE AGENT) 

735 else: 

736 if self.verify_id_existence: # if verify_id_existence is False just skip these operations 

737 message = messages['m22'] 

738 table = {row_idx: {field: [item_idx]}} 

739 if id not in self.id_cache: 

740 if not self.existence.check_id_existence(id): 

741 error = self.helper.create_error_dict(validation_level='existence', 

742 error_type='warning', 

743 message=message, 

744 error_label='ra_id_existence', 

745 located_in='item', 

746 table=table, 

747 valid=True) 

748 jsonl_file.write(error) 

749 self.id_cache[id] = False 

750 else: 

751 self.id_cache[id] = True 

752 elif self.id_cache[id] is False: 

753 error = self.helper.create_error_dict(validation_level='existence', 

754 error_type='warning', 

755 message=message, 

756 error_label='ra_id_existence', 

757 located_in='item', 

758 table=table, 

759 valid=True) 

760 jsonl_file.write(error) 

761 

762 if row_ok and id_ok and type_ok: # row semantics is checked only when the involved parts are well-formed 

763 

764 invalid_semantics = self.semantics.check_semantics(row, id_type_dict) 

765 if invalid_semantics: 

766 message = messages['m23'] 

767 table = {row_idx: invalid_semantics} 

768 error = self.helper.create_error_dict(validation_level='semantics', 

769 error_type='error', 

770 message=message, 

771 error_label='row_semantics', 

772 located_in='field', 

773 table=table) 

774 jsonl_file.write(error) 

775 

776 # GET DUPLICATE BIBLIOGRAPHIC ENTITIES (LMDB-backed, no in-memory entity list needed) 

777 duplicate_report = self.wellformed.get_duplicates_meta( 

778 uf=uf, data_cache=duplicate_data_cache, messages=messages) 

779 for error in duplicate_report: 

780 jsonl_file.write(error) 

781 

782 logger.info("META-CSV validation complete, writing summary to '%s'", self.output_fp_txt) 

783 

784 # write human-readable validation summary to txt file 

785 textual_report_stream= self.helper.create_validation_summary_stream(self.output_fp_json) 

786 with open(self.output_fp_txt, 'w', encoding='utf-8') as f: 

787 for l in textual_report_stream: 

788 f.write(l) 

789 

790 is_valid = JSONLStreamIO(self.output_fp_json).is_empty() 

791 logger.info("META-CSV validation result for '%s': %s", self.csv_doc, 'valid' if is_valid else 'invalid') 

792 return is_valid 

793 

794 def validate_cits(self) -> bool: 

795 """ 

796 Validates an instance of CITS-CSV using JSON-Lines streaming output 

797 :return: True if the table is valid (i.e. no issues found), False otherwise. 

798 """ 

799 logger.info("Validating CITS-CSV: '%s'", self.csv_doc) 

800 messages = self.messages 

801 

802 # Set up Union-Find and cache for duplicate detection 

803 if self.memory_efficient: 

804 tmp_base = self._cache_dir or '.' 

805 uf_tmp_dir = tempfile.mkdtemp(prefix='uf_dup_cits_', dir=tmp_base) 

806 uf_env = lmdb.open(uf_tmp_dir, map_size=self.map_size, sync=False, metasync=False) 

807 uf = LmdbUnionFind(uf_env) 

808 else: 

809 uf = InMemoryUnionFind() 

810 uf_tmp_dir = None 

811 uf_env = None 

812 

813 self._uf = uf 

814 self._uf_tmp_dir = uf_tmp_dir 

815 self._uf_env = uf_env 

816 

817 dup_cache_name = f'dup_cits_{abs(hash(self.csv_doc))}' 

818 if self.memory_efficient: 

819 duplicate_data_cache = LmdbCache(dup_cache_name, base_dir=self._cache_dir or '.', map_size=self.map_size) 

820 else: 

821 duplicate_data_cache = InMemoryCache(dup_cache_name) 

822 duplicate_data_cache.open() 

823 self.duplicate_data_cache = duplicate_data_cache 

824 

825 # Open JSON-L file for streaming output 

826 with JSONLStreamIO(self.output_fp_json, 'a') as jsonl_file: 

827 for row_idx, row in enumerate(tqdm(self.csv_stream.stream(), desc="Validating")): 

828 # Recreate SPARQL client periodically to prevent memory growth 

829 if row_idx > 0 and row_idx % 10000 == 0 and self.existence.use_meta_endpoint: 

830 self.existence._recreate_sparql_client() 

831 

832 # Collect ID data for duplicate detection 

833 citing_id = row.get('citing_id', '') 

834 cited_id = row.get('cited_id', '') 

835 duplicate_data_cache[str(row_idx)] = (citing_id, cited_id) 

836 for id_value in (citing_id, cited_id): 

837 if id_value: 

838 items = id_value.split(' ') 

839 non_empty = [i for i in items if i] 

840 if non_empty: 

841 uf.find(non_empty[0]) 

842 for _i in range(1, len(non_empty)): 

843 uf.union(non_empty[0], non_empty[_i]) 

844 

845 # Parse row into structured object 

846 row_obj = read_citations_row(row) 

847 

848 for field, value in row.items(): 

849 if field == 'citing_id' or field == 'cited_id': 

850 # Use structured object's parsed field 

851 if field == 'citing_id': 

852 items = row_obj.citing_id 

853 else: # field == 'cited_id' 

854 items = row_obj.cited_id 

855 

856 if not items: # Check required fields 

857 message = messages['m7'] 

858 table = {row_idx: {field: None}} 

859 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

860 error_type='error', 

861 message=message, 

862 error_label='required_value_cits', 

863 located_in='field', 

864 table=table) 

865 jsonl_file.write(error) 

866 else: # i.e. if string is not empty... 

867 ids_set = set() # set where to put valid IDs only 

868 

869 for item_idx, item in enumerate(items): 

870 

871 if item == '': 

872 message = messages['m1'] 

873 table = {row_idx: {field: [item_idx]}} 

874 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

875 error_type='error', 

876 message=message, 

877 error_label='extra_space', 

878 located_in='item', 

879 table=table) 

880 jsonl_file.write(error) 

881 

882 elif not self.wellformed.wellformedness_br_id(item): 

883 message = messages['m2'] 

884 table = {row_idx: {field: [item_idx]}} 

885 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

886 error_type='error', 

887 message=message, 

888 error_label='br_id_format', 

889 located_in='item', 

890 table=table) 

891 jsonl_file.write(error) 

892 

893 else: 

894 if item not in ids_set: 

895 ids_set.add(item) 

896 else: # in-field duplication of the same ID 

897 

898 table = {row_idx: {field: [i for i, v in enumerate(items) if v == item]}} 

899 message = messages['m6'] 

900 

901 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

902 error_type='error', 

903 message=message, 

904 error_label='duplicate_id', 

905 located_in='item', 

906 table=table) # 'valid'=False 

907 jsonl_file.write(error) 

908 # 2nd validation level: EXTERNAL SYNTAX OF ID (BIBLIOGRAPHIC RESOURCE) 

909 if not self.syntax.check_id_syntax(item): 

910 message = messages['m19'] 

911 table = {row_idx: {field: [item_idx]}} 

912 error = self.helper.create_error_dict(validation_level='external_syntax', 

913 error_type='error', 

914 message=message, 

915 error_label='br_id_syntax', 

916 located_in='item', 

917 table=table) 

918 jsonl_file.write(error) 

919 # 3rd validation level: EXISTENCE OF ID (BIBLIOGRAPHIC RESOURCE) 

920 else: 

921 if self.verify_id_existence: # if verify_id_existence is False just skip these operations 

922 message = messages['m20'] 

923 table = {row_idx: {field: [item_idx]}} 

924 if item not in self.id_cache: 

925 if not self.existence.check_id_existence(item): 

926 error = self.helper.create_error_dict(validation_level='existence', 

927 error_type='warning', 

928 message=message, 

929 error_label='br_id_existence', 

930 located_in='item', 

931 table=table, valid=True) 

932 jsonl_file.write(error) 

933 self.id_cache[item] = False 

934 else: 

935 self.id_cache[item] = True 

936 elif self.id_cache[item] is False: 

937 error = self.helper.create_error_dict(validation_level='existence', 

938 error_type='warning', 

939 message=message, 

940 error_label='br_id_existence', 

941 located_in='item', 

942 table=table, valid=True) 

943 jsonl_file.write(error) 

944 

945 elif field == 'citing_publication_date' or field == 'cited_publication_date': 

946 if value: 

947 if not self.wellformed.wellformedness_date(value): 

948 message = messages['m3'] 

949 table = {row_idx: {field: [0]}} 

950 error = self.helper.create_error_dict(validation_level='csv_wellformedness', 

951 error_type='error', 

952 message=message, 

953 error_label='date_format', 

954 located_in='item', 

955 table=table) 

956 jsonl_file.write(error) 

957 

958 # GET SELF-CITATIONS AND DUPLICATE CITATIONS (LMDB-backed, no in-memory entity list needed) 

959 duplicate_report = self.wellformed.get_duplicates_cits( 

960 uf=uf, data_cache=duplicate_data_cache, messages=messages) 

961 for error in duplicate_report: 

962 jsonl_file.write(error) 

963 

964 logger.info("CITS-CSV validation complete, writing summary to '%s'", self.output_fp_txt) 

965 

966 # write human-readable validation summary to txt file 

967 textual_report_stream= self.helper.create_validation_summary_stream(self.output_fp_json) 

968 with open(self.output_fp_txt, "w", encoding='utf-8') as f: 

969 for l in textual_report_stream: 

970 f.write(l) 

971 

972 is_valid = JSONLStreamIO(self.output_fp_json).is_empty() 

973 logger.info("CITS-CSV validation result for '%s': %s", self.csv_doc, 'valid' if is_valid else 'invalid') 

974 return is_valid 

975 

976 

977class ClosureValidator: 

978 """ 

979 Validate a META-CSV and a CITS-CSV together, checking both their 

980 individual correctness and the transitive closure between the two tables. 

981 

982 The closure check verifies that every entity referenced in citations has 

983 corresponding metadata and vice versa. 

984 """ 

985 

986 def __init__(self, meta_in: str, meta_out_dir: str, cits_in: str, cits_out_dir: str, 

987 strict_sequentiality: bool = False, meta_kwargs: Optional[dict] = None, 

988 cits_kwargs: Optional[dict] = None, use_lmdb: bool = False, 

989 map_size: int = 1 * 1024**3, cache_dir: Optional[str] = None, 

990 verbose: bool = False, log_file: Optional[str] = None) -> None: 

991 """ 

992 Initialise the ClosureValidator with META-CSV and CITS-CSV file paths. 

993 

994 Creates two internal :class:`Validator` instances and verifies that each 

995 receives the expected table type. 

996 

997 :param meta_in: Path to the META-CSV file. 

998 :type meta_in: str 

999 :param meta_out_dir: Directory for META-CSV validation output. 

1000 :type meta_out_dir: str 

1001 :param cits_in: Path to the CITS-CSV file. 

1002 :type cits_in: str 

1003 :param cits_out_dir: Directory for CITS-CSV validation output. 

1004 :type cits_out_dir: str 

1005 :param strict_sequentiality: If ``True``, skip the closure check when 

1006 the individual validations already report errors. Defaults to ``False``. 

1007 :type strict_sequentiality: bool 

1008 :param meta_kwargs: Extra keyword arguments forwarded to the META-CSV 

1009 :class:`Validator`. 

1010 :type meta_kwargs: Optional[dict] 

1011 :param cits_kwargs: Extra keyword arguments forwarded to the CITS-CSV 

1012 :class:`Validator`. 

1013 :type cits_kwargs: Optional[dict] 

1014 :param use_lmdb: If ``True``, use LMDB for caching (recommended for large files). 

1015 :type use_lmdb: bool 

1016 :param map_size: Maximum size in bytes for each LMDB environment (default 1 GB). 

1017 :type map_size: int 

1018 :param cache_dir: Optional base directory under which all LMDB caches are created. 

1019 :type cache_dir: Optional[str] 

1020 :param verbose: If ``True``, enable INFO-level logging output. 

1021 :type verbose: bool 

1022 :param log_file: If provided, write logs to this file instead of the terminal. 

1023 :type log_file: Optional[str] 

1024 :raises TableNotMatchingInstance: if either file is not of the expected table type. 

1025 """ 

1026 self.meta_csv_doc = meta_in 

1027 self.meta_output_dir = meta_out_dir 

1028 self.cits_csv_doc = cits_in 

1029 self.cits_output_dir = cits_out_dir 

1030 self.strict_sequentiality = strict_sequentiality # if True, runs the check on transitive closure if and only if the other checks passed without errors 

1031 self.verbose = verbose 

1032 self.log_file = log_file 

1033 configure_logging(verbose, log_file) 

1034 logger.info("Initializing ClosureValidator: meta='%s', cits='%s'", meta_in, cits_in) 

1035 

1036 script_dir = dirname(abspath(__file__)) # Directory where the script is located 

1037 with open(join(script_dir, 'messages.yaml'), 'r', encoding='utf-8') as fm: 

1038 self.messages = full_load(fm) 

1039 

1040 # Define default kwargs for optional configuration of the two instances of Validator 

1041 default_kwargs = {'use_meta_endpoint': False, 'verify_id_existence': True, 'use_lmdb': use_lmdb, 'map_size': map_size, 'cache_dir': cache_dir} 

1042 

1043 # Merge user-provided kwargs with defaults 

1044 meta_kwargs = {**default_kwargs, **(meta_kwargs or {})} 

1045 cits_kwargs = {**default_kwargs, **(cits_kwargs or {})} 

1046 

1047 # Propagate verbose and log_file to child validators 

1048 meta_kwargs['verbose'] = verbose 

1049 cits_kwargs['verbose'] = verbose 

1050 meta_kwargs['log_file'] = log_file 

1051 cits_kwargs['log_file'] = log_file 

1052 

1053 # Create Validator instances with merged kwargs 

1054 self.meta_validator = Validator(self.meta_csv_doc, self.meta_output_dir, **meta_kwargs) 

1055 self.cits_validator = Validator(self.cits_csv_doc, self.cits_output_dir, **cits_kwargs) 

1056 

1057 self.helper = Helper() 

1058 self.memory_efficient = use_lmdb 

1059 

1060 # Check if each of the two Validator instances is passed the expected table type 

1061 if self.meta_validator.table_to_process != 'meta_csv': 

1062 raise TableNotMatchingInstance(self.meta_csv_doc, self.meta_validator.table_to_process, 'meta_csv') 

1063 if self.cits_validator.table_to_process != 'cits_csv': 

1064 raise TableNotMatchingInstance(self.cits_csv_doc, self.cits_validator.table_to_process, 'cits_csv') 

1065 

1066 def __enter__(self): 

1067 """Context manager entry.""" 

1068 return self 

1069 

1070 def __exit__(self, exc_type, exc_val, exc_tb): 

1071 """Context manager exit - ensures caches are properly closed.""" 

1072 self.close() 

1073 return False 

1074 

1075 def close(self): 

1076 """Close caches and clean up resources.""" 

1077 if hasattr(self, 'meta_validator'): 

1078 self.meta_validator.close() 

1079 if hasattr(self, 'cits_validator'): 

1080 self.cits_validator.close() 

1081 if hasattr(self, '_meta_positions_cache') and self._meta_positions_cache is not None: 

1082 self._meta_positions_cache.close() 

1083 self._meta_positions_cache = None 

1084 if hasattr(self, '_cits_positions_cache') and self._cits_positions_cache is not None: 

1085 self._cits_positions_cache.close() 

1086 self._cits_positions_cache = None 

1087 

1088 

1089 def check_closure(self) -> tuple[bool, bool]: 

1090 """ 

1091 Check transitive closure between META-CSV and CITS-CSV. 

1092 Reuses the Union-Finds populated during pass 1. 

1093 Only position caches are built here (from the stored data caches). 

1094 """ 

1095 print('Checking transitive closure between metadata and citations...') 

1096 logger.info("Checking transitive closure between metadata and citations") 

1097 meta_is_valid_closure = True 

1098 cits_is_valid_closure = True 

1099 

1100 # Reuse UFs and data caches from pass 1 

1101 meta_uf = self.meta_validator._uf 

1102 cits_uf = self.cits_validator._uf 

1103 meta_cache = self.meta_validator.duplicate_data_cache 

1104 cits_cache = self.cits_validator.duplicate_data_cache 

1105 

1106 # Only position caches are created here 

1107 if self.memory_efficient: 

1108 cache_base = self.meta_validator._cache_dir or '.' 

1109 self._meta_positions_cache = LmdbCache('closure_meta_positions', base_dir=cache_base, map_size=self.meta_validator.map_size) 

1110 self._cits_positions_cache = LmdbCache('closure_cits_positions', base_dir=cache_base, map_size=self.cits_validator.map_size) 

1111 else: 

1112 self._meta_positions_cache = InMemoryCache('closure_meta_positions') 

1113 self._cits_positions_cache = InMemoryCache('closure_cits_positions') 

1114 self._meta_positions_cache.open() 

1115 self._cits_positions_cache.open() 

1116 

1117 try: 

1118 # --- Build position cache from META data cache --- 

1119 for str_idx, id_value in meta_cache.items(): 

1120 row_idx = int(str_idx) 

1121 if id_value: 

1122 ids = id_value.split(' ') 

1123 ids_unique = list(set(i for i in ids if i)) 

1124 if not ids_unique: 

1125 continue 

1126 pos_entry = {row_idx: {'id': list(range(len(ids)))}} 

1127 for item in ids_unique: 

1128 existing = self._meta_positions_cache.get(item) 

1129 if existing is None: 

1130 self._meta_positions_cache[item] = [pos_entry] 

1131 else: 

1132 existing.append(pos_entry) 

1133 self._meta_positions_cache[item] = existing 

1134 

1135 # --- Build position cache from CITS data cache --- 

1136 for str_idx, (citing_id_str, cited_id_str) in cits_cache.items(): 

1137 row_idx = int(str_idx) 

1138 for id_value, field_name in ( 

1139 (citing_id_str, 'citing_id'), 

1140 (cited_id_str, 'cited_id'), 

1141 ): 

1142 if id_value: 

1143 ids = id_value.split(' ') 

1144 ids_unique = list(set(i for i in ids if i)) 

1145 if not ids_unique: 

1146 continue 

1147 pos_entry = {row_idx: {field_name: list(range(len(ids)))}} 

1148 for item in ids_unique: 

1149 existing = self._cits_positions_cache.get(item) 

1150 if existing is None: 

1151 self._cits_positions_cache[item] = [pos_entry] 

1152 else: 

1153 existing.append(pos_entry) 

1154 self._cits_positions_cache[item] = existing 

1155 

1156 # --- Check META entities that have no citations --- 

1157 # An entity is "missing citations" when ALL of its IDs are absent from cits_positions_cache. 

1158 # We check membership directly in LMDB (O(1) per lookup) — no large Python sets needed. 

1159 with JSONLStreamIO(self.meta_validator.output_fp_json, 'a') as meta_json_file: 

1160 for _root, br_ids_set in meta_uf.iter_components(): 

1161 if all(id_ not in self._cits_positions_cache for id_ in br_ids_set): 

1162 table: dict = {} 

1163 for id_ in br_ids_set: 

1164 for pos_dict in (self._meta_positions_cache.get(id_) or []): 

1165 table.update(pos_dict) 

1166 if table: 

1167 meta_json_file.write( 

1168 self.helper.create_error_dict( 

1169 validation_level='csv_wellformedness', 

1170 error_type='error', 

1171 message=self.messages['m24'], 

1172 error_label='missing_citations', 

1173 located_in='row', 

1174 table=table, 

1175 ) 

1176 ) 

1177 meta_is_valid_closure = False 

1178 

1179 # --- Check CITS entities that have no metadata --- 

1180 with JSONLStreamIO(self.cits_validator.output_fp_json, 'a') as cits_json_file: 

1181 for _root, br_ids_set in cits_uf.iter_components(): 

1182 if all(id_ not in self._meta_positions_cache for id_ in br_ids_set): 

1183 table = {} 

1184 for id_ in br_ids_set: 

1185 for pos_dict in (self._cits_positions_cache.get(id_) or []): 

1186 table.update(pos_dict) 

1187 if table: 

1188 cits_json_file.write( 

1189 self.helper.create_error_dict( 

1190 validation_level='csv_wellformedness', 

1191 error_type='error', 

1192 message=self.messages['m25'], 

1193 error_label='missing_metadata', 

1194 located_in='row', 

1195 table=table, 

1196 ) 

1197 ) 

1198 cits_is_valid_closure = False 

1199 

1200 finally: 

1201 self._meta_positions_cache.close() 

1202 self._cits_positions_cache.close() 

1203 

1204 # Write human-readable validation summaries for both tables 

1205 textual_report_stream_meta = self.helper.create_validation_summary_stream(self.meta_validator.output_fp_json) 

1206 textual_report_stream_cits = self.helper.create_validation_summary_stream(self.cits_validator.output_fp_json) 

1207 

1208 with open(self.meta_validator.output_fp_txt, "w", encoding='utf-8') as fm: 

1209 for lm in textual_report_stream_meta: 

1210 fm.write(lm) 

1211 with open(self.cits_validator.output_fp_txt, "w", encoding='utf-8') as fc: 

1212 for lc in textual_report_stream_cits: 

1213 fc.write(lc) 

1214 

1215 logger.info("Closure check complete: meta_valid=%s, cits_valid=%s", meta_is_valid_closure, cits_is_valid_closure) 

1216 

1217 return (meta_is_valid_closure, cits_is_valid_closure) 

1218 

1219 

1220 def validate(self) -> tuple[bool, bool]: 

1221 """ 

1222 Run the full validation pipeline on both META-CSV and CITS-CSV. 

1223 

1224 First validates each table individually, then (unless 

1225 ``strict_sequentiality`` is ``True`` and errors were found) checks the 

1226 transitive closure between the two. 

1227 

1228 :return: A two-element tuple ``(meta_is_valid, cits_is_valid)``. 

1229 :rtype: tuple[bool, bool] 

1230 """ 

1231 try: 

1232 # Run single validation for META-CSV and CITS-CSV 

1233 logger.info("Running individual validation of META-CSV and CITS-CSV") 

1234 meta_is_valid = self.meta_validator.validate() 

1235 cits_is_valid = self.cits_validator.validate() 

1236 logger.info("Individual validation complete: meta_valid=%s, cits_valid=%s", meta_is_valid, cits_is_valid) 

1237 

1238 # in case some errors have already been found and strict_sequentiality is True, don't run the check on closure 

1239 if self.strict_sequentiality: 

1240 if not meta_is_valid or not cits_is_valid: 

1241 print('The separate validation of the metadata (META-CSV) and citations (CITS-CSV) tables already detected some error (in one or both documents).') 

1242 print('Skipping the check of transitive closure as strict_sequentiality==True.') 

1243 logger.info("Skipping closure check due to strict_sequentiality: meta_valid=%s, cits_valid=%s", meta_is_valid, cits_is_valid) 

1244 return (meta_is_valid, cits_is_valid) 

1245 

1246 # Run validation for transitive closure 

1247 meta_is_valid_closure, cits_is_valid_closure = self.check_closure() 

1248 

1249 final_meta = bool(meta_is_valid_closure and meta_is_valid) 

1250 final_cits = bool(cits_is_valid_closure and cits_is_valid) 

1251 logger.info("ClosureValidator final result: meta_valid=%s, cits_valid=%s", final_meta, final_cits) 

1252 return (final_meta, final_cits) 

1253 finally: 

1254 logger.info("ClosureValidator process finished. Cleaning up resources...") 

1255 self.close() 

1256 logger.info("ClosureValidator resources cleaned up.") 

1257 

1258 

1259if __name__ == '__main__': 

1260 parser = ArgumentParser() 

1261 parser.add_argument('-i', '--input', dest='input_csv', required=True, 

1262 help='The path to the CSV document to validate.', type=str) 

1263 parser.add_argument('-o', '--output', dest='output_dir', required=True, 

1264 help='The path to the directory where to store the output JSON-L file.', type=str) 

1265 parser.add_argument('-m', '--use-meta', dest='use_meta_endpoint', action='store_true', 

1266 help='Use the OC Meta endpoint to check if an ID exists.', required=False) 

1267 parser.add_argument('-s', '--no-id-existence', dest='verify_id_existence', action='store_false', 

1268 help='Skip checking if IDs are registered somewhere, i.e. do not use Meta endpoint nor external APIs.', 

1269 required=False) 

1270 parser.add_argument('--use-lmdb', dest='use_lmdb', action='store_true', 

1271 default=False, 

1272 help='Enable LMDB for efficient memory usage with large files (default: True).') 

1273 parser.add_argument('--map-size', dest='map_size', type=int, default=1, 

1274 help='LMDB map size in GiB (default: 1).', 

1275 required=False) 

1276 parser.add_argument('--cache-dir', dest='cache_dir', type=str, default=None, 

1277 help='Base directory under which all LMDB caches are created.', 

1278 required=False) 

1279 parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', 

1280 default=False, 

1281 help='Enable verbose logging output.') 

1282 parser.add_argument('--log-file', dest='log_file', type=str, default=None, 

1283 help='Write logs to this file instead of the terminal.', 

1284 required=False) 

1285 args = parser.parse_args() 

1286 v = Validator( 

1287 args.input_csv, 

1288 args.output_dir, 

1289 use_meta_endpoint=args.use_meta_endpoint, 

1290 verify_id_existence=args.verify_id_existence, 

1291 use_lmdb=args.use_lmdb, 

1292 map_size=args.map_size * 1024**3, 

1293 cache_dir=args.cache_dir, 

1294 verbose=args.verbose, 

1295 log_file=args.log_file, 

1296 ) 

1297 v.validate() 

1298 

1299# to instantiate the class, write: 

1300# v = Validator('path/to/csv/file', 'output/dir/path') # optionally set use_meta_endpoint to True and/or verify_id_existence to False 

1301# v.validate() --> validates, returns the output, and saves files 

1302 

1303 

1304# FROM THE COMMAND LINE: 

1305# python -m oc_validator.main -i <input csv file path> -o <output dir path> [-m] [-s] [--use-lmdb [--cache-dir <dir>] [--map-size <GiB>]]