Coverage for oc_validator / csv_wellformedness.py: 99%
243 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 15:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 15:46 +0000
1# ISC License
2#
3# Copyright (c) 2023-2026, Elia Rizzetto, Silvio Peroni
4#
5# Permission to use, copy, modify, and/or distribute this software for any
6# purpose with or without fee is hereby granted, provided that the above
7# copyright notice and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
12# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
13# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
14# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
15# PERFORMANCE OF THIS SOFTWARE.
17from re import match, search, sub, findall
18from roman import fromRoman, InvalidRomanNumeralError
19from oc_validator.helper import Helper
20from oc_validator.lmdb_cache import LmdbCache, LmdbUnionFind, InMemoryCache, InMemoryUnionFind
21from json import load
22from os.path import join, dirname, abspath
23from typing import Generator, List, Union
25class Wellformedness:
26 """
27 Provides well-formedness checks for every field of META-CSV and CITS-CSV rows.
29 Each method validates the format of a single field type (IDs, dates, venues,
30 pages, etc.) against the OpenCitations CSV specification.
31 """
33 def __init__(self) -> None:
34 """
35 Initialise the Wellformedness checker and load the ID-type alignment configuration.
37 :rtype: None
38 """
39 self.helper = Helper()
40 self.br_id_schemes = ['doi', 'issn', 'isbn', 'pmid', 'pmcid', 'url', 'wikidata', 'wikipedia', 'openalex', 'temp', 'local', 'omid', 'jid', 'arxiv']
41 self.br_id_schemes_for_venues = ['doi', 'issn', 'isbn', 'pmid', 'pmcid', 'url', 'wikidata', 'wikipedia', 'openalex', 'omid', 'jid', 'arxiv']
42 self.ra_id_schemes = ['crossref', 'orcid', 'viaf', 'wikidata', 'ror', 'omid']
43 with open(join(dirname(abspath(__file__)), 'id_type_alignment.json'), 'r', encoding='utf-8') as fa:
44 self.id_type_dict = load(fa)
47 def wellformedness_br_id(self, id_element: str) -> bool:
48 """
49 Validate the well-formedness of a single bibliographic-resource ID element.
51 Checks that the element matches ``<scheme>:<value>`` where *scheme* is one
52 of the recognised bibliographic-resource ID schemes.
54 :param id_element: A single ID string (e.g. ``"doi:10.1234/abc"``).
55 :type id_element: str
56 :return: ``True`` if the element is well-formed, ``False`` otherwise.
57 :rtype: bool
58 """
59 id_pattern = fr'^({"|".join(self.br_id_schemes)}):\S+$'
60 if match(id_pattern, id_element):
61 return True
62 else:
63 return False
65 def wellformedness_people_item(self, ra_item: str) -> bool:
66 """
67 Validate the well-formedness of an item inside the ``author`` or ``editor`` field.
69 An item may be a name, a name followed by bracketed IDs, or just bracketed IDs,
70 conforming to the META-CSV syntax.
72 :param ra_item: The raw string of a single author/editor item.
73 :type ra_item: str
74 :return: ``True`` if well-formed, ``False`` otherwise.
75 :rtype: bool
76 """
77 # todo: create stricter regex for not allowing characters that are likely to be illegal in a person's name/surname
78 # (e.g. digits, apostrophe, underscore, full-stop, etc.)
79 outside_brackets = r'(?:[^\s,;\[\]]+(?:\s[^\s,;\[\]]+)*),?(?:\s[^\s,;\[\]]+)*'
80 inside_brackets = fr'\[({"|".join(self.ra_id_schemes)}):\S+(?:\s({"|".join(self.ra_id_schemes)}):\S+)*\]'
81 ra_item_pattern = fr'^(?:({outside_brackets}\s{inside_brackets})|({outside_brackets}\s?)|({inside_brackets}))$'
83 if match(ra_item_pattern, ra_item):
84 return True
85 else:
86 return False
88 def wellformedness_publisher_item(self, ra_item: str) -> bool:
89 """
90 Validate the well-formedness of an item inside the ``publisher`` field.
92 Unlike :meth:`wellformedness_people_item`, this allows commas in the
93 name portion because publisher names may contain them.
95 :param ra_item: The raw string of a single publisher item.
96 :type ra_item: str
97 :return: ``True`` if well-formed, ``False`` otherwise.
98 :rtype: bool
99 """
100 outside_brackets_pub = r'(?:[^\s\[\]]+(?:\s[^\s\[\]]+)*)'
101 inside_brackets = fr'\[({"|".join(self.ra_id_schemes)}):\S+(?:\s({"|".join(self.ra_id_schemes)}):\S+)*\]'
102 ra_item_pattern = fr'^(?:({outside_brackets_pub}\s{inside_brackets})|({outside_brackets_pub}\s?)|({inside_brackets}))$'
104 if match(ra_item_pattern, ra_item):
105 return True
106 else:
107 return False
109 def orphan_ra_id(self, ra_item: str) -> bool:
110 """
111 Detect responsible-agent IDs that appear outside square brackets.
113 Returns ``True`` if the input string is likely to contain one or more
114 RA IDs not enclosed in ``[]``, which would indicate a formatting issue.
116 :param ra_item: The item inside an RA field, as split by the ``'; '`` separator.
117 :type ra_item: str
118 :return: ``True`` if an orphan ID is found (likely not well-formed),
119 ``False`` if no match is found.
120 :rtype: bool
121 """
122 if search(fr'({"|".join(self.ra_id_schemes)}):', sub(r'\[.*\]', '', ra_item)):
123 return True
124 else:
125 return False
127 def wellformedness_date(self, date_field: str) -> bool:
128 """
129 Validate the well-formedness of a date string.
131 Accepted formats are ``YYYY`` or ``YYYY-MM`` or ``YYYY-MM-DD``.
133 :param date_field: The raw date string from a date field.
134 :type date_field: str
135 :return: ``True`` if the date is well-formed, ``False`` otherwise.
136 :rtype: bool
137 """
138 date_pattern = r'^((?:\d{4}\-(?:0[1-9]|1[012])(?:\-(?:0[1-9]|[12][0-9]|3[01]))?)|(?:\d{4}))$'
139 if match(date_pattern, date_field):
140 return True
141 else:
142 return False
144 def wellformedness_venue(self, venue_value: str) -> bool:
145 """
146 Validate the well-formedness of the ``venue`` field value.
148 The venue may be a name, a name followed by bracketed IDs, or just
149 bracketed IDs, using bibliographic-resource ID schemes.
151 :param venue_value: The raw venue string.
152 :type venue_value: str
153 :return: ``True`` if well-formed, ``False`` otherwise.
154 :rtype: bool
155 """
156 outside_brackets_venue = r'(?:[^\s\[\]]+(?:\s[^\s\[\]]+)*)'
157 # pmcids are not valid identifiers for 'venues'!
158 inside_brackets_venue = fr'\[({"|".join(self.br_id_schemes_for_venues)}):\S+(?:\s({"|".join(self.br_id_schemes_for_venues)}):\S+)*\]'
159 venue_pattern = fr'^(?:({outside_brackets_venue}\s{inside_brackets_venue})|({outside_brackets_venue}\s?)|({inside_brackets_venue}))$'
161 if match(venue_pattern, venue_value):
162 return True
163 else:
164 return False
166 def orphan_venue_id(self, venue_value: str) -> bool:
167 """
168 Detect venue IDs that appear outside square brackets.
170 Returns ``True`` if the input string likely contains one or more
171 bibliographic-resource IDs not enclosed in ``[]``.
173 :param venue_value: The raw value of the ``venue`` field.
174 :type venue_value: str
175 :return: ``True`` if an orphan ID is found, ``False`` otherwise.
176 :rtype: bool
177 """
178 if search(fr'({"|".join(self.br_id_schemes_for_venues)}):', sub(r'\[.*\]', '', venue_value)):
179 return True
180 else:
181 return False
183 def wellformedness_volume_issue(self, vi_value: str) -> bool:
184 """
185 Validate the well-formedness of a ``volume`` or ``issue`` field value.
187 The value must be one or more non-whitespace tokens separated by single spaces.
189 :param vi_value: The raw volume or issue string.
190 :type vi_value: str
191 :return: ``True`` if well-formed, ``False`` otherwise.
192 :rtype: bool
193 """
194 vi_pattern = r'^\S+(?:\s\S+)*$'
196 if match(vi_pattern, vi_value):
197 return True
198 else:
199 return False
201 def wellformedness_page(self, page_value: str) -> bool:
202 """
203 Validate the well-formedness of the ``page`` field value.
205 Accepts numeric ranges (``1-10``), Roman numeral ranges (``i-x``),
206 and alphanumeric page ranges (``a1-b2``).
208 :param page_value: The raw page string.
209 :type page_value: str
210 :return: ``True`` if well-formed, ``False`` otherwise.
211 :rtype: bool
212 """
213 # todo: create stricter regex for roman numerals and valid intervals
214 # NB: incorrect roman numerals and impossible ranges (e.g. 200-20) still validate!
215 natural_number = r'([1-9][0-9]*)'
216 roman_numeral = r'([IiVvXxLlCcDdMm]+)'
217 single_alphanum = r'((?:(?:[A-Za-z]|[α-ωΑ-Ω])?[1-9]\d*)|(?:[1-9]\d*(?:[A-Za-z]|[α-ωΑ-Ω])?))'
218 normal_page_pattern = f'^(?:{natural_number}|{roman_numeral})-(?:{natural_number}|{roman_numeral})$'
219 alphanum_page_pattern = f'^{single_alphanum}-{single_alphanum}$'
221 if match(normal_page_pattern, page_value):
222 return True
223 elif match(alphanum_page_pattern, page_value):
224 return True
225 else:
226 return False
228 def check_page_interval(self, page_interval: str) -> bool:
229 """
230 Validate that the page interval is logically consistent.
232 Verifies that the start page is less than or equal to the end page.
233 Handles Arabic numerals, Roman numerals, and alphanumeric strings.
235 :param page_interval: The value of the ``page`` field (e.g. ``"1-10"``).
236 :type page_interval: str
237 :return: ``True`` if the interval is valid or cannot be converted to
238 integers, ``False`` if the interval is definitively invalid.
239 :rtype: bool
240 """
242 def extract_segments(text):
243 letters = findall(r'[a-zA-Z]+', text)
244 numbers = findall(r'\d+', text)
245 return letters, numbers
247 both_num = page_interval.split('-')
248 converted = []
249 for num_str in both_num:
250 if num_str.isnumeric():
251 converted.append(int(num_str))
252 else:
253 try:
254 converted.append(fromRoman(num_str.upper()))
255 except InvalidRomanNumeralError:
256 if both_num[0] == both_num[1]:
257 return True # ignore cases with identical alphanumeric strings (e.g. "a12-a12")
259 elif both_num[0].isalnum() and both_num[1].isalnum():
260 alph1, num1 = extract_segments(both_num[0])
261 alph2, num2 = extract_segments(both_num[1])
262 if [l for l in (alph1, num1, alph2, num2) if len(l)>1]:
263 return False # exclude strs with non-contiguous alphabetic segments (e.g. 'a123b-c456')
264 char1 = alph1[0].lower() if alph1 else ''
265 char2 = alph2[0].lower() if alph2 else ''
266 dig1 = int(num1[0]) if num1 else 0
267 dig2 = int(num2[0]) if num2 else 0
268 if ((char1 == char2) or (char1 and not char2)) and (dig1 <= dig2):
269 return True
270 return False
271 else:
272 return False
274 if converted[0] <= converted[1]:
275 return True
276 else:
277 return False
279 def check_duplicate_ra_by_id(self, items: List) -> List[List[int]]:
280 """
281 Find in-field duplicates among author/editor items based on shared RA IDs.
283 Two items are considered duplicates when they share at least one
284 responsible-agent identifier (e.g. ``orcid:0000-0001``).
286 :param items: List of :class:`~oc_validator.table_reader.AgentItem` objects.
287 :type items: List
288 :return: A list of duplicate groups. Each group is a sorted list of item
289 indices that share at least one RA ID. An empty list means no duplicates.
290 :rtype: List[List[int]]
291 """
292 pid_to_indices: dict = {}
293 for idx, item in enumerate(items):
294 for pid in item.ids:
295 pid_to_indices.setdefault(pid, []).append(idx)
297 seen_groups: set = set()
298 result: List[List[int]] = []
299 for indices in pid_to_indices.values():
300 if len(indices) >= 2:
301 group = tuple(sorted(set(indices)))
302 if group not in seen_groups:
303 seen_groups.add(group)
304 result.append(list(group))
306 return result
308 def check_duplicate_publisher_by_raw(self, items: List) -> List[List[int]]:
309 """
310 Find in-field duplicates among publisher items based on raw string exact match.
312 Two publisher items are considered duplicates when their raw string
313 representations are identical.
315 :param items: List of :class:`~oc_validator.table_reader.AgentItem` objects.
316 :type items: List
317 :return: A list of duplicate groups. Each group is a list of item indices
318 whose raw strings are identical. An empty list means no duplicates.
319 :rtype: List[List[int]]
320 """
321 raw_to_indices: dict = {}
322 for idx, item in enumerate(items):
323 raw_to_indices.setdefault(item._raw, []).append(idx)
325 result: List[List[int]] = []
326 for indices in raw_to_indices.values():
327 if len(indices) >= 2:
328 result.append(indices)
330 return result
332 def wellformedness_type(self, type_value: str) -> bool:
333 """
334 Validate the well-formedness of the ``type`` field value.
336 The type must be one of the keys in the ID-type alignment dictionary.
338 :param type_value: The raw type string.
339 :type type_value: str
340 :return: ``True`` if the type is recognised, ``False`` otherwise.
341 :rtype: bool
342 """
344 if type_value in self.id_type_dict.keys():
345 return True
346 else:
347 return False
349 def get_missing_values(self, row: dict) -> dict:
350 """
351 Check whether a row has all required fields for its resource type.
353 When the ``id`` field is empty or contains only ``temp:``/``local:`` IDs,
354 certain other fields become mandatory depending on the ``type`` value.
355 The returned dictionary maps field names to ``[0]`` (for fields that
356 condition the requirement) or ``None`` (for missing fields).
358 :param row: A dictionary representing a single CSV row.
359 :type row: dict
360 :return: Dictionary locating missing required fields. Empty if the row
361 satisfies all requirements.
362 :rtype: dict
363 """
365 # TODO: Consider using an external config file, as you do for checking id-type semantic alignment, since the list
366 # of accepted types might change/be extended frequently!
368 missing = {}
369 ids = row['id'].split(' ')
370 internal_only_id = all(id.startswith('temp:') or id.startswith('local:') for id in ids)
371 if not row['id'] or internal_only_id: # ID value is missing or only temp/local IDs are specified
373 if row['type']: # ID is missing and 'type' is specified
375 if row['type'] in ['book', 'dataset', 'data file', 'dissertation', 'edited book',
376 'journal article', 'monograph', 'other', 'peer review', 'posted content',
377 'web content', 'proceedings article', 'reference book', 'report']:
378 if not row['title']:
379 missing['type'] = [0]
380 missing['title'] = None
381 if not row['pub_date']:
382 missing['type'] = [0]
383 missing['pub_date'] = None
384 if not row['author'] and not row['editor']:
385 missing['type'] = [0]
386 if not row['author']:
387 missing['author'] = None
388 if not row['editor']:
389 missing['editor'] = None
391 elif row['type'] in ['book chapter', 'book part', 'book section', 'book track', 'component',
392 'reference entry']:
393 if not row['title']:
394 missing['type'] = [0]
395 missing['title'] = None
396 if not row['venue']:
397 missing['type'] = [0]
398 missing['venue'] = None
400 elif row['type'] in ['book series', 'book set', 'journal', 'proceedings', 'proceedings series',
401 'report series', 'standard', 'standard series']:
402 if not row['title']:
403 missing['type'] = [0]
404 missing['title'] = None
406 elif row['type'] == 'journal issue':
407 if not row['venue']:
408 missing['type'] = [0]
409 missing['venue'] = None
410 if not row['title'] and not row['issue']:
411 missing['type'] = [0]
412 if not row['title']:
413 missing['title'] = None
414 if not row['issue']:
415 missing['issue'] = None
417 elif row['type'] == 'journal volume':
418 if not row['venue']:
419 missing['type'] = [0]
420 missing['venue'] = None
421 if not row['title'] and not row['volume']:
422 missing['type'] = [0]
423 if not row['title']:
424 missing['title'] = None
425 if not row['volume']:
426 missing['volume'] = None
428 else:
430 if not row['title']:
431 missing['type'] = None
432 missing['title'] = None
433 if not row['pub_date']:
434 missing['type'] = None
435 missing['pub_date'] = None
436 if not row['author'] and not row['editor']:
437 missing['type'] = None
438 if not row['author']:
439 missing['author'] = None
440 if not row['editor']:
441 missing['editor'] = None
443 # the 2 conditions below apply to any type of BR and regardless of an ID being specified
444 # cfr. also docs/mandatory_fields.csv
446 if row['volume'] and not row['venue']:
447 missing['volume'] = [0]
448 missing['venue'] = None
450 if row['issue'] and not row['venue']:
451 missing['issue'] = [0]
452 missing['venue'] = None
455 return missing
457 # # THIS FUNCTION IS THE OLD FUNCTION TO GET DUPLICATES, KEPT HERE FOR REFERENCE.
458 # def get_duplicates_cits(self, entities: list, data_dict: list, messages) -> list:
459 # """
460 # Creates a list of dictionaries containing the duplication error in the whole document, either within a row
461 # (self-citation) or between two or more rows (duplicate citations).
462 # :param entities: list containing sets of strings (the IDs), where each set corresponds to a bibliographic entity
463 # :param data_dict: the list of the document's rows, read as dictionaries
464 # :param messages: the dictionary containing the messages as they're read from the .yaml config file
465 # :return: list of dictionaries, each carrying full info about each duplication error within the document.
466 # """
467 # visited_dicts = []
468 # report = []
469 # for row_idx, row in enumerate(data_dict):
470 # citation = {'citing_id': '', 'cited_id': ''}
472 # citing_items = row['citing_id'].split(' ')
473 # for item in citing_items:
474 # if citation['citing_id'] == '':
475 # for set_idx, set in enumerate(entities):
476 # if item in set: # mapping the single ID to its corresponding set representing the bibl. entity
477 # citation['citing_id'] = set_idx
478 # break
480 # cited_items = row['cited_id'].split(' ')
481 # for item in cited_items:
482 # if citation['cited_id'] == '':
483 # for set_idx, set in enumerate(entities):
484 # if item in set: # mapping the single ID to its corresponding set representing the bibl. entity
485 # citation['cited_id'] = set_idx
486 # break
488 # # If a field contains only invalid items, it is not possible to map it to an entity set: process the row
489 # # only if both citing and cited are associated to an entity set, i.e. their value in the 'citation'
490 # # dictionary is not still an empty string (as it had been initialized).
491 # if citation['citing_id'] != '' and citation['cited_id'] != '':
493 # if citation['citing_id'] == citation['cited_id']: # SELF-CITATION warning (an entity cites itself)
494 # table = {
495 # row_idx: {
496 # 'citing_id': [idx for idx in range(len(citing_items))],
497 # 'cited_id': [idx for idx in range(len(cited_items))]
498 # }
499 # }
500 # message = messages['m4']
501 # report.append(
502 # self.helper.create_error_dict(validation_level='csv_wellformedness', error_type='warning',
503 # message=message, error_label='self-citation', located_in='field',
504 # table=table, valid=True))
506 # # SAVE CITATIONS BETWEEN ENTITIES IN A LIST.
507 # # Each citation is represented as a nested dictionary in which the key-values representing the entity-to-entity
508 # # citation are unique within the list, but the table representing the location of an INSTANCE of an
509 # # entity-to-entity citation is updated each time a new instance of such citation is found in the csv document.
511 # citation_table = {
512 # row_idx: {
513 # 'citing_id': [idx for idx in range(len(citing_items))],
514 # 'cited_id': [idx for idx in range(len(cited_items))]
515 # }
516 # }
518 # cit_info = {'citation': citation, 'table': citation_table}
520 # if not visited_dicts: # just for the first round of the iteration (when visited_dicts is empty)
521 # visited_dicts.append(cit_info)
522 # else:
523 # for dict_idx, cit_dict in enumerate(visited_dicts):
524 # if citation == cit_dict['citation']:
525 # visited_dicts[dict_idx]['table'].update(cit_info['table'])
526 # break
527 # elif dict_idx == (len(visited_dicts) - 1):
528 # visited_dicts.append(cit_info)
530 # for d in visited_dicts:
531 # if len(d['table']) > 1: # if there's more than 1 row in table for a citation (duplicate rows error)
532 # table = d['table']
533 # message = messages['m5']
535 # report.append(
536 # self.helper.create_error_dict(validation_level='csv_wellformedness', error_type='error',
537 # message=message, error_label='duplicate_citation', located_in='row',
538 # table=table))
539 # return report
541 def get_duplicates_cits(self, uf: Union[LmdbUnionFind, InMemoryUnionFind], data_cache: Union[LmdbCache, InMemoryCache], messages: dict) -> Generator:
542 """
543 Find duplicate citations and self-citations in a CITS-CSV document.
545 No new large structures are held in RAM: the citation-occurrence map is
546 persisted in a temporary cache and iterated at the end to detect duplicates.
548 :param uf: Union-Find populated with all IDs encountered during validation.
549 :type uf: Union[LmdbUnionFind, InMemoryUnionFind]
550 :param data_cache: Cache mapping ``str(row_idx)`` to a
551 ``(citing_id_str, cited_id_str)`` tuple for every row.
552 :type data_cache: Union[LmdbCache, InMemoryCache]
553 :param messages: Error-message template dictionary (from ``messages.yaml``).
554 :type messages: dict
555 :return: Generator of error-dict objects.
556 :rtype: Generator
557 """
558 # citation_map_cache: key = "citing_root\x00cited_root",
559 # value = {row_idx: {'citing_id': [...], 'cited_id': [...]}}
561 # Infer from the type of data_cache whether to use an in-memory
562 # object or an LMDB cache to collect duplicate issues positions
563 if isinstance(data_cache, InMemoryCache):
564 res_caching = InMemoryCache
565 else:
566 res_caching = LmdbCache
568 with res_caching('dup_cits_citation_map') as citation_map_cache:
569 for str_idx, (citing_id, cited_id) in data_cache.items():
570 row_idx = int(str_idx)
571 citing_items = citing_id.split(' ')
572 cited_items = cited_id.split(' ')
574 # Find first registered citing / cited entity root (O(1) LMDB lookup each)
575 citing_root = next(
576 (uf.find(item) for item in citing_items if item in uf), None
577 )
578 cited_root = next(
579 (uf.find(item) for item in cited_items if item in uf), None
580 )
582 if citing_root is None or cited_root is None:
583 continue # row has no mappable entities — skip
585 # SELF-CITATION: citing and cited entity are the same
586 if citing_root == cited_root:
587 table = {
588 row_idx: {
589 'citing_id': list(range(len(citing_items))),
590 'cited_id': list(range(len(cited_items))),
591 }
592 }
593 yield self.helper.create_error_dict(
594 validation_level='csv_wellformedness',
595 error_type='warning',
596 message=messages['m4'],
597 error_label='self-citation',
598 located_in='field',
599 table=table,
600 valid=True,
601 )
603 # Accumulate citation occurrences in LMDB (read-modify-write)
604 cit_key = f'{citing_root}\x00{cited_root}'
605 row_entry = {
606 row_idx: {
607 'citing_id': list(range(len(citing_items))),
608 'cited_id': list(range(len(cited_items))),
609 }
610 }
611 existing = citation_map_cache.get(cit_key)
612 if existing is None:
613 citation_map_cache[cit_key] = row_entry
614 else:
615 existing.update(row_entry)
616 citation_map_cache[cit_key] = existing
618 # Second scan: yield errors for citations that appear more than once
619 for _cit_key, table in citation_map_cache.items():
620 if len(table) > 1:
621 yield self.helper.create_error_dict(
622 validation_level='csv_wellformedness',
623 error_type='error',
624 message=messages['m5'],
625 error_label='duplicate_citation',
626 located_in='row',
627 table=table,
628 )
630 # # THIS FUNCTION IS THE OLD FUNCTION TO GET DUPLICATES, KEPT HERE FOR REFERENCE.
631 # def get_duplicates_meta(self, entities: list, data_dict: list, messages) -> list:
632 # """
633 # Creates a list of dictionaries containing the duplication error in the whole document between two or more rows.
634 # :param entities: list containing sets of strings (the IDs), where each set corresponds to a bibliographic entity.
635 # :param data_dict: the list of the document's rows, read as dictionaries
636 # :param messages: the dictionary containing the messages as they're read from the .yaml config file
637 # :return: list of dictionaries, each carrying full info about each duplication error within the document.
638 # """
639 # visited_dicts = []
640 # report = []
641 # for row_idx, row in enumerate(data_dict):
642 # br = {'meta_id': None, 'table': {}}
643 # items = row['id'].split(' ')
645 # for item in items:
646 # if not br['meta_id']:
647 # for set_idx, set in enumerate(entities):
648 # if item in set: # mapping the single ID to its corresponding set representing the bibl. entity
649 # br['meta_id'] = str(set_idx)
650 # br['table'] = {row_idx: {'id': list(range(len(items)))}}
651 # break
653 # # process row only if a meta_id has been associated to it (i.e. id field contains at least one valid identifier)
654 # if br['meta_id']:
655 # if not visited_dicts: # just for the first round of the iteration (when visited_dicts is empty)
656 # visited_dicts.append(br)
657 # else:
658 # for visited_br_idx, visited_br in enumerate(visited_dicts):
659 # if br['meta_id'] == visited_br['meta_id']:
660 # visited_dicts[visited_br_idx]['table'].update(br['table'])
661 # break
662 # elif visited_br_idx == (len(visited_dicts) - 1):
663 # visited_dicts.append(br)
665 # for d in visited_dicts:
666 # if len(d['table']) > 1: # if there's more than 1 row in table for a br (duplicate rows error)
667 # table = d['table']
668 # message = messages['m11']
670 # report.append(
671 # self.helper.create_error_dict(validation_level='csv_wellformedness', error_type='error',
672 # message=message, error_label='duplicate_br', located_in='row',
673 # table=table))
675 # return report
677 def get_duplicates_meta(self, uf: Union[LmdbUnionFind, InMemoryUnionFind], data_cache: Union[LmdbCache, InMemoryCache], messages: dict) -> Generator:
678 """
679 Find duplicate bibliographic entities in a META-CSV document.
681 No new large structures are held in RAM: the entity-occurrence map is
682 persisted in a temporary cache and iterated at the end to detect duplicates.
684 :param uf: Union-Find populated with all IDs encountered during validation.
685 :type uf: Union[LmdbUnionFind, InMemoryUnionFind]
686 :param data_cache: Cache mapping ``str(row_idx)`` to the raw ``'id'``
687 field string for every row.
688 :type data_cache: Union[LmdbCache, InMemoryCache]
689 :param messages: Error-message template dictionary (from ``messages.yaml``).
690 :type messages: dict
691 :return: Generator of error-dict objects.
692 :rtype: Generator
693 """
694 # meta_map_cache: key = entity root string,
695 # value = {row_idx: {'id': [0, 1, ...]}}
697 # Infer from the type of data_cache whether to use an in-memory
698 # object or an LMDB cache to collect duplicate issues positions
699 if isinstance(data_cache, InMemoryCache):
700 res_caching = InMemoryCache
701 else:
702 res_caching = LmdbCache
704 with res_caching('dup_meta_entity_map') as meta_map_cache:
705 for str_idx, id_value in data_cache.items():
706 row_idx = int(str_idx)
707 items = id_value.split(' ')
709 # Find the first registered ID and resolve its entity root
710 root = next(
711 (uf.find(item) for item in items if item in uf), None
712 )
713 if root is None:
714 continue # row has no valid mapped IDs — skip
716 row_table = {row_idx: {'id': list(range(len(items)))}}
718 # Read-modify-write: accumulate occurrences per entity root
719 existing:dict = meta_map_cache.get(root)
720 if existing is None:
721 meta_map_cache[root] = row_table
722 else:
723 existing.update(row_table)
724 meta_map_cache[root] = existing
726 # Second scan: yield errors for entities that appear more than once
727 for _root, table in meta_map_cache.items():
728 if len(table) > 1:
729 yield self.helper.create_error_dict(
730 validation_level='csv_wellformedness',
731 error_type='error',
732 message=messages['m11'],
733 error_label='duplicate_br',
734 located_in='row',
735 table=table,
736 )