Coverage for oc_validator / csv_wellformedness.py: 99%

243 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 15:46 +0000

1# ISC License 

2# 

3# Copyright (c) 2023-2026, Elia Rizzetto, Silvio Peroni 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any 

6# purpose with or without fee is hereby granted, provided that the above 

7# copyright notice and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, 

12# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 

13# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR 

14# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 

15# PERFORMANCE OF THIS SOFTWARE. 

16 

17from re import match, search, sub, findall 

18from roman import fromRoman, InvalidRomanNumeralError 

19from oc_validator.helper import Helper 

20from oc_validator.lmdb_cache import LmdbCache, LmdbUnionFind, InMemoryCache, InMemoryUnionFind 

21from json import load 

22from os.path import join, dirname, abspath 

23from typing import Generator, List, Union 

24 

25class Wellformedness: 

26 """ 

27 Provides well-formedness checks for every field of META-CSV and CITS-CSV rows. 

28 

29 Each method validates the format of a single field type (IDs, dates, venues, 

30 pages, etc.) against the OpenCitations CSV specification. 

31 """ 

32 

33 def __init__(self) -> None: 

34 """ 

35 Initialise the Wellformedness checker and load the ID-type alignment configuration. 

36 

37 :rtype: None 

38 """ 

39 self.helper = Helper() 

40 self.br_id_schemes = ['doi', 'issn', 'isbn', 'pmid', 'pmcid', 'url', 'wikidata', 'wikipedia', 'openalex', 'temp', 'local', 'omid', 'jid', 'arxiv'] 

41 self.br_id_schemes_for_venues = ['doi', 'issn', 'isbn', 'pmid', 'pmcid', 'url', 'wikidata', 'wikipedia', 'openalex', 'omid', 'jid', 'arxiv'] 

42 self.ra_id_schemes = ['crossref', 'orcid', 'viaf', 'wikidata', 'ror', 'omid'] 

43 with open(join(dirname(abspath(__file__)), 'id_type_alignment.json'), 'r', encoding='utf-8') as fa: 

44 self.id_type_dict = load(fa) 

45 

46 

47 def wellformedness_br_id(self, id_element: str) -> bool: 

48 """ 

49 Validate the well-formedness of a single bibliographic-resource ID element. 

50 

51 Checks that the element matches ``<scheme>:<value>`` where *scheme* is one 

52 of the recognised bibliographic-resource ID schemes. 

53 

54 :param id_element: A single ID string (e.g. ``"doi:10.1234/abc"``). 

55 :type id_element: str 

56 :return: ``True`` if the element is well-formed, ``False`` otherwise. 

57 :rtype: bool 

58 """ 

59 id_pattern = fr'^({"|".join(self.br_id_schemes)}):\S+$' 

60 if match(id_pattern, id_element): 

61 return True 

62 else: 

63 return False 

64 

65 def wellformedness_people_item(self, ra_item: str) -> bool: 

66 """ 

67 Validate the well-formedness of an item inside the ``author`` or ``editor`` field. 

68 

69 An item may be a name, a name followed by bracketed IDs, or just bracketed IDs, 

70 conforming to the META-CSV syntax. 

71 

72 :param ra_item: The raw string of a single author/editor item. 

73 :type ra_item: str 

74 :return: ``True`` if well-formed, ``False`` otherwise. 

75 :rtype: bool 

76 """ 

77 # todo: create stricter regex for not allowing characters that are likely to be illegal in a person's name/surname 

78 # (e.g. digits, apostrophe, underscore, full-stop, etc.) 

79 outside_brackets = r'(?:[^\s,;\[\]]+(?:\s[^\s,;\[\]]+)*),?(?:\s[^\s,;\[\]]+)*' 

80 inside_brackets = fr'\[({"|".join(self.ra_id_schemes)}):\S+(?:\s({"|".join(self.ra_id_schemes)}):\S+)*\]' 

81 ra_item_pattern = fr'^(?:({outside_brackets}\s{inside_brackets})|({outside_brackets}\s?)|({inside_brackets}))$' 

82 

83 if match(ra_item_pattern, ra_item): 

84 return True 

85 else: 

86 return False 

87 

88 def wellformedness_publisher_item(self, ra_item: str) -> bool: 

89 """ 

90 Validate the well-formedness of an item inside the ``publisher`` field. 

91 

92 Unlike :meth:`wellformedness_people_item`, this allows commas in the 

93 name portion because publisher names may contain them. 

94 

95 :param ra_item: The raw string of a single publisher item. 

96 :type ra_item: str 

97 :return: ``True`` if well-formed, ``False`` otherwise. 

98 :rtype: bool 

99 """ 

100 outside_brackets_pub = r'(?:[^\s\[\]]+(?:\s[^\s\[\]]+)*)' 

101 inside_brackets = fr'\[({"|".join(self.ra_id_schemes)}):\S+(?:\s({"|".join(self.ra_id_schemes)}):\S+)*\]' 

102 ra_item_pattern = fr'^(?:({outside_brackets_pub}\s{inside_brackets})|({outside_brackets_pub}\s?)|({inside_brackets}))$' 

103 

104 if match(ra_item_pattern, ra_item): 

105 return True 

106 else: 

107 return False 

108 

109 def orphan_ra_id(self, ra_item: str) -> bool: 

110 """ 

111 Detect responsible-agent IDs that appear outside square brackets. 

112 

113 Returns ``True`` if the input string is likely to contain one or more 

114 RA IDs not enclosed in ``[]``, which would indicate a formatting issue. 

115 

116 :param ra_item: The item inside an RA field, as split by the ``'; '`` separator. 

117 :type ra_item: str 

118 :return: ``True`` if an orphan ID is found (likely not well-formed), 

119 ``False`` if no match is found. 

120 :rtype: bool 

121 """ 

122 if search(fr'({"|".join(self.ra_id_schemes)}):', sub(r'\[.*\]', '', ra_item)): 

123 return True 

124 else: 

125 return False 

126 

127 def wellformedness_date(self, date_field: str) -> bool: 

128 """ 

129 Validate the well-formedness of a date string. 

130 

131 Accepted formats are ``YYYY`` or ``YYYY-MM`` or ``YYYY-MM-DD``. 

132 

133 :param date_field: The raw date string from a date field. 

134 :type date_field: str 

135 :return: ``True`` if the date is well-formed, ``False`` otherwise. 

136 :rtype: bool 

137 """ 

138 date_pattern = r'^((?:\d{4}\-(?:0[1-9]|1[012])(?:\-(?:0[1-9]|[12][0-9]|3[01]))?)|(?:\d{4}))$' 

139 if match(date_pattern, date_field): 

140 return True 

141 else: 

142 return False 

143 

144 def wellformedness_venue(self, venue_value: str) -> bool: 

145 """ 

146 Validate the well-formedness of the ``venue`` field value. 

147 

148 The venue may be a name, a name followed by bracketed IDs, or just 

149 bracketed IDs, using bibliographic-resource ID schemes. 

150 

151 :param venue_value: The raw venue string. 

152 :type venue_value: str 

153 :return: ``True`` if well-formed, ``False`` otherwise. 

154 :rtype: bool 

155 """ 

156 outside_brackets_venue = r'(?:[^\s\[\]]+(?:\s[^\s\[\]]+)*)' 

157 # pmcids are not valid identifiers for 'venues'! 

158 inside_brackets_venue = fr'\[({"|".join(self.br_id_schemes_for_venues)}):\S+(?:\s({"|".join(self.br_id_schemes_for_venues)}):\S+)*\]' 

159 venue_pattern = fr'^(?:({outside_brackets_venue}\s{inside_brackets_venue})|({outside_brackets_venue}\s?)|({inside_brackets_venue}))$' 

160 

161 if match(venue_pattern, venue_value): 

162 return True 

163 else: 

164 return False 

165 

166 def orphan_venue_id(self, venue_value: str) -> bool: 

167 """ 

168 Detect venue IDs that appear outside square brackets. 

169 

170 Returns ``True`` if the input string likely contains one or more 

171 bibliographic-resource IDs not enclosed in ``[]``. 

172 

173 :param venue_value: The raw value of the ``venue`` field. 

174 :type venue_value: str 

175 :return: ``True`` if an orphan ID is found, ``False`` otherwise. 

176 :rtype: bool 

177 """ 

178 if search(fr'({"|".join(self.br_id_schemes_for_venues)}):', sub(r'\[.*\]', '', venue_value)): 

179 return True 

180 else: 

181 return False 

182 

183 def wellformedness_volume_issue(self, vi_value: str) -> bool: 

184 """ 

185 Validate the well-formedness of a ``volume`` or ``issue`` field value. 

186 

187 The value must be one or more non-whitespace tokens separated by single spaces. 

188 

189 :param vi_value: The raw volume or issue string. 

190 :type vi_value: str 

191 :return: ``True`` if well-formed, ``False`` otherwise. 

192 :rtype: bool 

193 """ 

194 vi_pattern = r'^\S+(?:\s\S+)*$' 

195 

196 if match(vi_pattern, vi_value): 

197 return True 

198 else: 

199 return False 

200 

201 def wellformedness_page(self, page_value: str) -> bool: 

202 """ 

203 Validate the well-formedness of the ``page`` field value. 

204 

205 Accepts numeric ranges (``1-10``), Roman numeral ranges (``i-x``), 

206 and alphanumeric page ranges (``a1-b2``). 

207 

208 :param page_value: The raw page string. 

209 :type page_value: str 

210 :return: ``True`` if well-formed, ``False`` otherwise. 

211 :rtype: bool 

212 """ 

213 # todo: create stricter regex for roman numerals and valid intervals 

214 # NB: incorrect roman numerals and impossible ranges (e.g. 200-20) still validate! 

215 natural_number = r'([1-9][0-9]*)' 

216 roman_numeral = r'([IiVvXxLlCcDdMm]+)' 

217 single_alphanum = r'((?:(?:[A-Za-z]|[α-ωΑ-Ω])?[1-9]\d*)|(?:[1-9]\d*(?:[A-Za-z]|[α-ωΑ-Ω])?))' 

218 normal_page_pattern = f'^(?:{natural_number}|{roman_numeral})-(?:{natural_number}|{roman_numeral})$' 

219 alphanum_page_pattern = f'^{single_alphanum}-{single_alphanum}$' 

220 

221 if match(normal_page_pattern, page_value): 

222 return True 

223 elif match(alphanum_page_pattern, page_value): 

224 return True 

225 else: 

226 return False 

227 

228 def check_page_interval(self, page_interval: str) -> bool: 

229 """ 

230 Validate that the page interval is logically consistent. 

231 

232 Verifies that the start page is less than or equal to the end page. 

233 Handles Arabic numerals, Roman numerals, and alphanumeric strings. 

234 

235 :param page_interval: The value of the ``page`` field (e.g. ``"1-10"``). 

236 :type page_interval: str 

237 :return: ``True`` if the interval is valid or cannot be converted to 

238 integers, ``False`` if the interval is definitively invalid. 

239 :rtype: bool 

240 """ 

241 

242 def extract_segments(text): 

243 letters = findall(r'[a-zA-Z]+', text) 

244 numbers = findall(r'\d+', text) 

245 return letters, numbers 

246 

247 both_num = page_interval.split('-') 

248 converted = [] 

249 for num_str in both_num: 

250 if num_str.isnumeric(): 

251 converted.append(int(num_str)) 

252 else: 

253 try: 

254 converted.append(fromRoman(num_str.upper())) 

255 except InvalidRomanNumeralError: 

256 if both_num[0] == both_num[1]: 

257 return True # ignore cases with identical alphanumeric strings (e.g. "a12-a12") 

258 

259 elif both_num[0].isalnum() and both_num[1].isalnum(): 

260 alph1, num1 = extract_segments(both_num[0]) 

261 alph2, num2 = extract_segments(both_num[1]) 

262 if [l for l in (alph1, num1, alph2, num2) if len(l)>1]: 

263 return False # exclude strs with non-contiguous alphabetic segments (e.g. 'a123b-c456') 

264 char1 = alph1[0].lower() if alph1 else '' 

265 char2 = alph2[0].lower() if alph2 else '' 

266 dig1 = int(num1[0]) if num1 else 0 

267 dig2 = int(num2[0]) if num2 else 0 

268 if ((char1 == char2) or (char1 and not char2)) and (dig1 <= dig2): 

269 return True 

270 return False 

271 else: 

272 return False 

273 

274 if converted[0] <= converted[1]: 

275 return True 

276 else: 

277 return False 

278 

279 def check_duplicate_ra_by_id(self, items: List) -> List[List[int]]: 

280 """ 

281 Find in-field duplicates among author/editor items based on shared RA IDs. 

282 

283 Two items are considered duplicates when they share at least one 

284 responsible-agent identifier (e.g. ``orcid:0000-0001``). 

285 

286 :param items: List of :class:`~oc_validator.table_reader.AgentItem` objects. 

287 :type items: List 

288 :return: A list of duplicate groups. Each group is a sorted list of item 

289 indices that share at least one RA ID. An empty list means no duplicates. 

290 :rtype: List[List[int]] 

291 """ 

292 pid_to_indices: dict = {} 

293 for idx, item in enumerate(items): 

294 for pid in item.ids: 

295 pid_to_indices.setdefault(pid, []).append(idx) 

296 

297 seen_groups: set = set() 

298 result: List[List[int]] = [] 

299 for indices in pid_to_indices.values(): 

300 if len(indices) >= 2: 

301 group = tuple(sorted(set(indices))) 

302 if group not in seen_groups: 

303 seen_groups.add(group) 

304 result.append(list(group)) 

305 

306 return result 

307 

308 def check_duplicate_publisher_by_raw(self, items: List) -> List[List[int]]: 

309 """ 

310 Find in-field duplicates among publisher items based on raw string exact match. 

311 

312 Two publisher items are considered duplicates when their raw string 

313 representations are identical. 

314 

315 :param items: List of :class:`~oc_validator.table_reader.AgentItem` objects. 

316 :type items: List 

317 :return: A list of duplicate groups. Each group is a list of item indices 

318 whose raw strings are identical. An empty list means no duplicates. 

319 :rtype: List[List[int]] 

320 """ 

321 raw_to_indices: dict = {} 

322 for idx, item in enumerate(items): 

323 raw_to_indices.setdefault(item._raw, []).append(idx) 

324 

325 result: List[List[int]] = [] 

326 for indices in raw_to_indices.values(): 

327 if len(indices) >= 2: 

328 result.append(indices) 

329 

330 return result 

331 

332 def wellformedness_type(self, type_value: str) -> bool: 

333 """ 

334 Validate the well-formedness of the ``type`` field value. 

335 

336 The type must be one of the keys in the ID-type alignment dictionary. 

337 

338 :param type_value: The raw type string. 

339 :type type_value: str 

340 :return: ``True`` if the type is recognised, ``False`` otherwise. 

341 :rtype: bool 

342 """ 

343 

344 if type_value in self.id_type_dict.keys(): 

345 return True 

346 else: 

347 return False 

348 

349 def get_missing_values(self, row: dict) -> dict: 

350 """ 

351 Check whether a row has all required fields for its resource type. 

352 

353 When the ``id`` field is empty or contains only ``temp:``/``local:`` IDs, 

354 certain other fields become mandatory depending on the ``type`` value. 

355 The returned dictionary maps field names to ``[0]`` (for fields that 

356 condition the requirement) or ``None`` (for missing fields). 

357 

358 :param row: A dictionary representing a single CSV row. 

359 :type row: dict 

360 :return: Dictionary locating missing required fields. Empty if the row 

361 satisfies all requirements. 

362 :rtype: dict 

363 """ 

364 

365 # TODO: Consider using an external config file, as you do for checking id-type semantic alignment, since the list 

366 # of accepted types might change/be extended frequently! 

367 

368 missing = {} 

369 ids = row['id'].split(' ') 

370 internal_only_id = all(id.startswith('temp:') or id.startswith('local:') for id in ids) 

371 if not row['id'] or internal_only_id: # ID value is missing or only temp/local IDs are specified 

372 

373 if row['type']: # ID is missing and 'type' is specified 

374 

375 if row['type'] in ['book', 'dataset', 'data file', 'dissertation', 'edited book', 

376 'journal article', 'monograph', 'other', 'peer review', 'posted content', 

377 'web content', 'proceedings article', 'reference book', 'report']: 

378 if not row['title']: 

379 missing['type'] = [0] 

380 missing['title'] = None 

381 if not row['pub_date']: 

382 missing['type'] = [0] 

383 missing['pub_date'] = None 

384 if not row['author'] and not row['editor']: 

385 missing['type'] = [0] 

386 if not row['author']: 

387 missing['author'] = None 

388 if not row['editor']: 

389 missing['editor'] = None 

390 

391 elif row['type'] in ['book chapter', 'book part', 'book section', 'book track', 'component', 

392 'reference entry']: 

393 if not row['title']: 

394 missing['type'] = [0] 

395 missing['title'] = None 

396 if not row['venue']: 

397 missing['type'] = [0] 

398 missing['venue'] = None 

399 

400 elif row['type'] in ['book series', 'book set', 'journal', 'proceedings', 'proceedings series', 

401 'report series', 'standard', 'standard series']: 

402 if not row['title']: 

403 missing['type'] = [0] 

404 missing['title'] = None 

405 

406 elif row['type'] == 'journal issue': 

407 if not row['venue']: 

408 missing['type'] = [0] 

409 missing['venue'] = None 

410 if not row['title'] and not row['issue']: 

411 missing['type'] = [0] 

412 if not row['title']: 

413 missing['title'] = None 

414 if not row['issue']: 

415 missing['issue'] = None 

416 

417 elif row['type'] == 'journal volume': 

418 if not row['venue']: 

419 missing['type'] = [0] 

420 missing['venue'] = None 

421 if not row['title'] and not row['volume']: 

422 missing['type'] = [0] 

423 if not row['title']: 

424 missing['title'] = None 

425 if not row['volume']: 

426 missing['volume'] = None 

427 

428 else: 

429 

430 if not row['title']: 

431 missing['type'] = None 

432 missing['title'] = None 

433 if not row['pub_date']: 

434 missing['type'] = None 

435 missing['pub_date'] = None 

436 if not row['author'] and not row['editor']: 

437 missing['type'] = None 

438 if not row['author']: 

439 missing['author'] = None 

440 if not row['editor']: 

441 missing['editor'] = None 

442 

443 # the 2 conditions below apply to any type of BR and regardless of an ID being specified 

444 # cfr. also docs/mandatory_fields.csv 

445 

446 if row['volume'] and not row['venue']: 

447 missing['volume'] = [0] 

448 missing['venue'] = None 

449 

450 if row['issue'] and not row['venue']: 

451 missing['issue'] = [0] 

452 missing['venue'] = None 

453 

454 

455 return missing 

456 

457 # # THIS FUNCTION IS THE OLD FUNCTION TO GET DUPLICATES, KEPT HERE FOR REFERENCE. 

458 # def get_duplicates_cits(self, entities: list, data_dict: list, messages) -> list: 

459 # """ 

460 # Creates a list of dictionaries containing the duplication error in the whole document, either within a row 

461 # (self-citation) or between two or more rows (duplicate citations). 

462 # :param entities: list containing sets of strings (the IDs), where each set corresponds to a bibliographic entity 

463 # :param data_dict: the list of the document's rows, read as dictionaries 

464 # :param messages: the dictionary containing the messages as they're read from the .yaml config file 

465 # :return: list of dictionaries, each carrying full info about each duplication error within the document. 

466 # """ 

467 # visited_dicts = [] 

468 # report = [] 

469 # for row_idx, row in enumerate(data_dict): 

470 # citation = {'citing_id': '', 'cited_id': ''} 

471 

472 # citing_items = row['citing_id'].split(' ') 

473 # for item in citing_items: 

474 # if citation['citing_id'] == '': 

475 # for set_idx, set in enumerate(entities): 

476 # if item in set: # mapping the single ID to its corresponding set representing the bibl. entity 

477 # citation['citing_id'] = set_idx 

478 # break 

479 

480 # cited_items = row['cited_id'].split(' ') 

481 # for item in cited_items: 

482 # if citation['cited_id'] == '': 

483 # for set_idx, set in enumerate(entities): 

484 # if item in set: # mapping the single ID to its corresponding set representing the bibl. entity 

485 # citation['cited_id'] = set_idx 

486 # break 

487 

488 # # If a field contains only invalid items, it is not possible to map it to an entity set: process the row 

489 # # only if both citing and cited are associated to an entity set, i.e. their value in the 'citation' 

490 # # dictionary is not still an empty string (as it had been initialized). 

491 # if citation['citing_id'] != '' and citation['cited_id'] != '': 

492 

493 # if citation['citing_id'] == citation['cited_id']: # SELF-CITATION warning (an entity cites itself) 

494 # table = { 

495 # row_idx: { 

496 # 'citing_id': [idx for idx in range(len(citing_items))], 

497 # 'cited_id': [idx for idx in range(len(cited_items))] 

498 # } 

499 # } 

500 # message = messages['m4'] 

501 # report.append( 

502 # self.helper.create_error_dict(validation_level='csv_wellformedness', error_type='warning', 

503 # message=message, error_label='self-citation', located_in='field', 

504 # table=table, valid=True)) 

505 

506 # # SAVE CITATIONS BETWEEN ENTITIES IN A LIST. 

507 # # Each citation is represented as a nested dictionary in which the key-values representing the entity-to-entity 

508 # # citation are unique within the list, but the table representing the location of an INSTANCE of an 

509 # # entity-to-entity citation is updated each time a new instance of such citation is found in the csv document. 

510 

511 # citation_table = { 

512 # row_idx: { 

513 # 'citing_id': [idx for idx in range(len(citing_items))], 

514 # 'cited_id': [idx for idx in range(len(cited_items))] 

515 # } 

516 # } 

517 

518 # cit_info = {'citation': citation, 'table': citation_table} 

519 

520 # if not visited_dicts: # just for the first round of the iteration (when visited_dicts is empty) 

521 # visited_dicts.append(cit_info) 

522 # else: 

523 # for dict_idx, cit_dict in enumerate(visited_dicts): 

524 # if citation == cit_dict['citation']: 

525 # visited_dicts[dict_idx]['table'].update(cit_info['table']) 

526 # break 

527 # elif dict_idx == (len(visited_dicts) - 1): 

528 # visited_dicts.append(cit_info) 

529 

530 # for d in visited_dicts: 

531 # if len(d['table']) > 1: # if there's more than 1 row in table for a citation (duplicate rows error) 

532 # table = d['table'] 

533 # message = messages['m5'] 

534 

535 # report.append( 

536 # self.helper.create_error_dict(validation_level='csv_wellformedness', error_type='error', 

537 # message=message, error_label='duplicate_citation', located_in='row', 

538 # table=table)) 

539 # return report 

540 

541 def get_duplicates_cits(self, uf: Union[LmdbUnionFind, InMemoryUnionFind], data_cache: Union[LmdbCache, InMemoryCache], messages: dict) -> Generator: 

542 """ 

543 Find duplicate citations and self-citations in a CITS-CSV document. 

544 

545 No new large structures are held in RAM: the citation-occurrence map is 

546 persisted in a temporary cache and iterated at the end to detect duplicates. 

547 

548 :param uf: Union-Find populated with all IDs encountered during validation. 

549 :type uf: Union[LmdbUnionFind, InMemoryUnionFind] 

550 :param data_cache: Cache mapping ``str(row_idx)`` to a 

551 ``(citing_id_str, cited_id_str)`` tuple for every row. 

552 :type data_cache: Union[LmdbCache, InMemoryCache] 

553 :param messages: Error-message template dictionary (from ``messages.yaml``). 

554 :type messages: dict 

555 :return: Generator of error-dict objects. 

556 :rtype: Generator 

557 """ 

558 # citation_map_cache: key = "citing_root\x00cited_root", 

559 # value = {row_idx: {'citing_id': [...], 'cited_id': [...]}} 

560 

561 # Infer from the type of data_cache whether to use an in-memory  

562 # object or an LMDB cache to collect duplicate issues positions 

563 if isinstance(data_cache, InMemoryCache): 

564 res_caching = InMemoryCache 

565 else: 

566 res_caching = LmdbCache 

567 

568 with res_caching('dup_cits_citation_map') as citation_map_cache: 

569 for str_idx, (citing_id, cited_id) in data_cache.items(): 

570 row_idx = int(str_idx) 

571 citing_items = citing_id.split(' ') 

572 cited_items = cited_id.split(' ') 

573 

574 # Find first registered citing / cited entity root (O(1) LMDB lookup each) 

575 citing_root = next( 

576 (uf.find(item) for item in citing_items if item in uf), None 

577 ) 

578 cited_root = next( 

579 (uf.find(item) for item in cited_items if item in uf), None 

580 ) 

581 

582 if citing_root is None or cited_root is None: 

583 continue # row has no mappable entities — skip 

584 

585 # SELF-CITATION: citing and cited entity are the same 

586 if citing_root == cited_root: 

587 table = { 

588 row_idx: { 

589 'citing_id': list(range(len(citing_items))), 

590 'cited_id': list(range(len(cited_items))), 

591 } 

592 } 

593 yield self.helper.create_error_dict( 

594 validation_level='csv_wellformedness', 

595 error_type='warning', 

596 message=messages['m4'], 

597 error_label='self-citation', 

598 located_in='field', 

599 table=table, 

600 valid=True, 

601 ) 

602 

603 # Accumulate citation occurrences in LMDB (read-modify-write) 

604 cit_key = f'{citing_root}\x00{cited_root}' 

605 row_entry = { 

606 row_idx: { 

607 'citing_id': list(range(len(citing_items))), 

608 'cited_id': list(range(len(cited_items))), 

609 } 

610 } 

611 existing = citation_map_cache.get(cit_key) 

612 if existing is None: 

613 citation_map_cache[cit_key] = row_entry 

614 else: 

615 existing.update(row_entry) 

616 citation_map_cache[cit_key] = existing 

617 

618 # Second scan: yield errors for citations that appear more than once 

619 for _cit_key, table in citation_map_cache.items(): 

620 if len(table) > 1: 

621 yield self.helper.create_error_dict( 

622 validation_level='csv_wellformedness', 

623 error_type='error', 

624 message=messages['m5'], 

625 error_label='duplicate_citation', 

626 located_in='row', 

627 table=table, 

628 ) 

629 

630 # # THIS FUNCTION IS THE OLD FUNCTION TO GET DUPLICATES, KEPT HERE FOR REFERENCE. 

631 # def get_duplicates_meta(self, entities: list, data_dict: list, messages) -> list: 

632 # """ 

633 # Creates a list of dictionaries containing the duplication error in the whole document between two or more rows. 

634 # :param entities: list containing sets of strings (the IDs), where each set corresponds to a bibliographic entity. 

635 # :param data_dict: the list of the document's rows, read as dictionaries 

636 # :param messages: the dictionary containing the messages as they're read from the .yaml config file 

637 # :return: list of dictionaries, each carrying full info about each duplication error within the document. 

638 # """ 

639 # visited_dicts = [] 

640 # report = [] 

641 # for row_idx, row in enumerate(data_dict): 

642 # br = {'meta_id': None, 'table': {}} 

643 # items = row['id'].split(' ') 

644 

645 # for item in items: 

646 # if not br['meta_id']: 

647 # for set_idx, set in enumerate(entities): 

648 # if item in set: # mapping the single ID to its corresponding set representing the bibl. entity 

649 # br['meta_id'] = str(set_idx) 

650 # br['table'] = {row_idx: {'id': list(range(len(items)))}} 

651 # break 

652 

653 # # process row only if a meta_id has been associated to it (i.e. id field contains at least one valid identifier) 

654 # if br['meta_id']: 

655 # if not visited_dicts: # just for the first round of the iteration (when visited_dicts is empty) 

656 # visited_dicts.append(br) 

657 # else: 

658 # for visited_br_idx, visited_br in enumerate(visited_dicts): 

659 # if br['meta_id'] == visited_br['meta_id']: 

660 # visited_dicts[visited_br_idx]['table'].update(br['table']) 

661 # break 

662 # elif visited_br_idx == (len(visited_dicts) - 1): 

663 # visited_dicts.append(br) 

664 

665 # for d in visited_dicts: 

666 # if len(d['table']) > 1: # if there's more than 1 row in table for a br (duplicate rows error) 

667 # table = d['table'] 

668 # message = messages['m11'] 

669 

670 # report.append( 

671 # self.helper.create_error_dict(validation_level='csv_wellformedness', error_type='error', 

672 # message=message, error_label='duplicate_br', located_in='row', 

673 # table=table)) 

674 

675 # return report 

676 

677 def get_duplicates_meta(self, uf: Union[LmdbUnionFind, InMemoryUnionFind], data_cache: Union[LmdbCache, InMemoryCache], messages: dict) -> Generator: 

678 """ 

679 Find duplicate bibliographic entities in a META-CSV document. 

680 

681 No new large structures are held in RAM: the entity-occurrence map is 

682 persisted in a temporary cache and iterated at the end to detect duplicates. 

683 

684 :param uf: Union-Find populated with all IDs encountered during validation. 

685 :type uf: Union[LmdbUnionFind, InMemoryUnionFind] 

686 :param data_cache: Cache mapping ``str(row_idx)`` to the raw ``'id'`` 

687 field string for every row. 

688 :type data_cache: Union[LmdbCache, InMemoryCache] 

689 :param messages: Error-message template dictionary (from ``messages.yaml``). 

690 :type messages: dict 

691 :return: Generator of error-dict objects. 

692 :rtype: Generator 

693 """ 

694 # meta_map_cache: key = entity root string, 

695 # value = {row_idx: {'id': [0, 1, ...]}} 

696 

697 # Infer from the type of data_cache whether to use an in-memory  

698 # object or an LMDB cache to collect duplicate issues positions 

699 if isinstance(data_cache, InMemoryCache): 

700 res_caching = InMemoryCache 

701 else: 

702 res_caching = LmdbCache 

703 

704 with res_caching('dup_meta_entity_map') as meta_map_cache: 

705 for str_idx, id_value in data_cache.items(): 

706 row_idx = int(str_idx) 

707 items = id_value.split(' ') 

708 

709 # Find the first registered ID and resolve its entity root 

710 root = next( 

711 (uf.find(item) for item in items if item in uf), None 

712 ) 

713 if root is None: 

714 continue # row has no valid mapped IDs — skip 

715 

716 row_table = {row_idx: {'id': list(range(len(items)))}} 

717 

718 # Read-modify-write: accumulate occurrences per entity root 

719 existing:dict = meta_map_cache.get(root) 

720 if existing is None: 

721 meta_map_cache[root] = row_table 

722 else: 

723 existing.update(row_table) 

724 meta_map_cache[root] = existing 

725 

726 # Second scan: yield errors for entities that appear more than once 

727 for _root, table in meta_map_cache.items(): 

728 if len(table) > 1: 

729 yield self.helper.create_error_dict( 

730 validation_level='csv_wellformedness', 

731 error_type='error', 

732 message=messages['m11'], 

733 error_label='duplicate_br', 

734 located_in='row', 

735 table=table, 

736 )