Coverage for oc_meta / lib / cleaner.py: 96%
239 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1# SPDX-FileCopyrightText: 2019 Silvio Peroni <silvio.peroni@unibo.it>
2# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
3# SPDX-FileCopyrightText: 2021 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8import html
9import re
10from collections import OrderedDict
11from datetime import datetime
12from typing import Tuple, Union
14from dateutil.parser import parse
15from oc_ds_converter.oc_idmanager import (DOIManager, ISBNManager, ISSNManager,
16 ORCIDManager)
18from oc_meta.lib.master_of_regex import (
19 RE_COMMA_AND_SPACES,
20 RE_INVALID_VI_PATTERNS,
21 RE_ISSUES_VALID_PATTERNS,
22 RE_VOLUMES_VALID_PATTERNS,
23 split_name_and_ids,
24)
26_HYPHEN_TRANS = str.maketrans({
27 '\u00AD': '\u002D', # Soft hyphen
28 '\u06D4': '\u002D', # Arabic Full Stop
29 '\u2010': '\u002D', # Hyphen
30 '\u2011': '\u002D', # Non-breaking Hyphen
31 '\u2012': '\u002D', # Figure Dash
32 '\u2013': '\u002D', # En-Dash
33 '\u2014': '\u002D', # Em-Dash
34 '\u2043': '\u002D', # Hyphen Bullet
35 '\u2212': '\u002D', # Minus Sign
36 '\u2796': '\u002D', # Heavy Minus Sign
37 '\u2CBA': '\u002D', # Coptic Capital Letter Dialect-p Ni
38 '\uFE58': '\u002D', # Small Em Dash
39})
41_SPACE_TRANS = str.maketrans({
42 '\u0009': '\u0020', # Character Tabulation
43 '\u00A0': '\u0020', # No-break space
44 '\u200B': '\u0020', # Zero width space
45 '\u202F': '\u0020', # Narrow no-break space
46 '\u2003': '\u0020', # Em Space
47 '\u2005': '\u0020', # Four-Per-Em Space
48 '\u2009': '\u0020', # Thin Space
49})
51# Translation table for control characters and extended ASCII to space
52# Covers: 0x00-0x1F (control chars), 0x7F (DEL), 0x80-0xFF (extended ASCII)
53_ASCII_CONTROL_TRANS = str.maketrans(
54 {chr(i): ' ' for i in range(0x00, 0x20)}
55 | {chr(0x7F): ' '}
56 | {chr(i): ' ' for i in range(0x80, 0x100)}
57)
59_DOI_MANAGER = DOIManager(use_api_service=False, storage_manager=None)
60_ISBN_MANAGER = ISBNManager()
61_ISSN_MANAGER = ISSNManager()
62_ORCID_MANAGER = ORCIDManager(use_api_service=False, storage_manager=None)
65def normalize_hyphens(string: str) -> str:
66 '''
67 It replaces any hyphen, dash and minus sign with a hyphen-minus character.
68 This is done for pages, IDs and dates.
70 .. list-table:: Comparison between the various characters similar to hyphen-minus
71 :widths: 25 25 50
72 :header-rows: 1
74 * - UTF-8
75 - SIGN
76 - NAME
77 * - U+002D
78 - -
79 - Hyphen-minus
80 * - U+00AD
81 -
82 - Soft hyphen
83 * - U+06D4
84 - ۔
85 - Arabic Full Stop
86 * - U+2010
87 - ‐
88 - Hyphen
89 * - U+2011
90 - −
91 - Non-breaking Hyphen
92 * - U+2012
93 - –
94 - Figure Dash
95 * - U+2013
96 - –
97 - En-Dash
98 * - U+2014
99 - —
100 - Em-Dash
101 * - U+2043
102 - ⁃
103 - Hyphen Bullet
104 * - U+2212
105 - −
106 - Minus Sign
107 * - U+2796
108 - ➖
109 - Heavy Minus Sign
110 * - U+2CBA
111 - Ⲻ
112 - Coptic Capital Letter Dialect-p Ni
113 * - U+FE58
114 - ﹘
115 - Small Em Dash
117 :returns: str -- the string with normalized hyphens
118 '''
119 return string.translate(_HYPHEN_TRANS)
122def normalize_spaces(string: str) -> str:
123 '''
124 It replaces any ambiguous spaces with a space.
126 .. list-table:: List of the various characters similar to the space
127 :widths: 25 25 50
128 :header-rows: 1
130 * - UTF-8
131 - NAME
132 * - U+0020
133 - Space
134 * - U+0009
135 - Character Tabulation
136 * - U+00A0
137 - No-break space
138 * - U+200B
139 - Zero width space
140 * - U+202F
141 - Narrow no-break space
142 * - U+2003
143 - Em Space
144 * - U+2005
145 - Four-Per-Em Space
146 * - U+2009
147 - Thin Space
149 :returns: str -- the string with normalized spaces
150 '''
151 return string.translate(_SPACE_TRANS).replace(' ', '\u0020')
154def clean_title(string: str, normalize: bool = True) -> str:
155 '''
156 Concerning titles of bibliographic resources ('venue' and 'title' columns),
157 every word in the title is capitalized except for those that have capitals within them
158 (probably acronyms, e.g. 'FaBiO and CiTO'). This exception, however, does not include entirely capitalized titles.
159 Finally, null characters and spaces are removed.
161 :returns: str -- The cleaned title
162 '''
163 title = string
164 if normalize:
165 if title.isupper():
166 title = title.lower()
167 words = title.split()
168 for i, w in enumerate(words):
169 if not any(x.isupper() for x in w):
170 words[i] = w.title()
171 return ' '.join(words)
172 return title
175def _date_parse_hack(date: str) -> str:
176 dt = parse(date, default=datetime(2001, 1, 1))
177 dt2 = parse(date, default=datetime(2002, 2, 2))
179 if dt.year == dt2.year and dt.month == dt2.month and dt.day == dt2.day:
180 clean_date = parse(date).strftime('%Y-%m-%d')
181 elif dt.year == dt2.year and dt.month == dt2.month:
182 clean_date = parse(date).strftime('%Y-%m')
183 elif dt.year == dt2.year:
184 clean_date = parse(date).strftime('%Y')
185 else:
186 clean_date = ''
187 return clean_date
190def clean_date(string: str) -> str:
191 '''
192 It tries to parse a date-string into a datetime object,
193 considering both the validity of the format (YYYYY-MM-DD) and the value (e.g. 30 February is not a valid date).
194 For example, a date 2020-02-30 will become 2020-02, because the day is invalid.
195 On the other hand, 2020-27-12 will become 2020 since the day
196 and month are invalid.
197 If the year is not valid (e.g.year >9999) data would be totally discarded.
199 :returns: str -- The cleaned date or an empty string
200 '''
201 date = string
202 try:
203 date = _date_parse_hack(date)
204 except ValueError:
205 try:
206 # e.g. 2021-12-17
207 if len(date) == 10:
208 try:
209 # Maybe only the day is invalid, try year-month
210 new_date = date[:-3]
211 date = _date_parse_hack(new_date)
212 except ValueError:
213 try:
214 # Maybe only the month is invalid, try year
215 new_date = date[:-6]
216 date = _date_parse_hack(new_date)
217 except ValueError:
218 date = ''
219 # e.g. 2021-12
220 elif len(date) == 7:
221 # Maybe only the month is invalid, try year
222 try:
223 new_date = date[:-3]
224 date = _date_parse_hack(new_date)
225 except ValueError:
226 date = ''
227 else:
228 date = ''
229 except ValueError:
230 date = ''
231 return date
234def clean_name(string: str) -> str:
235 '''
236 The first letter of each element of the name is capitalized and superfluous spaces are removed.
238 :returns: str -- The cleaned name
239 '''
240 name = string
241 if ',' in name:
242 split_name = RE_COMMA_AND_SPACES.split(name)
243 first_name = split_name[1].split()
244 for i, w in enumerate(first_name):
245 first_name[i] = clean_title(w)
246 new_first_name = ' '.join(first_name)
247 surname = split_name[0].split()
248 for i, w in enumerate(surname):
249 surname[i] = clean_title(w)
250 new_surname = ' '.join(surname)
251 if new_surname:
252 return new_surname + ', ' + new_first_name
253 return ''
254 split_name = name.split()
255 for i, w in enumerate(split_name):
256 split_name[i] = clean_title(w)
257 return ' '.join(split_name)
260def clean_agent_name(string: str) -> str:
261 '''
262 Clean a responsible agent name (author, editor, publisher).
264 Removes unwanted characters while preserving letters, numbers, spaces,
265 '&', apostrophes, and dots preceded by letters. Numbers and '&' are
266 kept for organization names (e.g., "3M", "Smith & Sons").
267 Normalizes hyphens, decodes HTML entities, and removes extra spaces.
269 :returns: str -- The cleaned agent name.
270 '''
271 unwanted_characters = {'[', ']', ';', '?'}
272 chars = []
273 for i, c in enumerate(string):
274 if c == '.':
275 if i > 0 and string[i-1].isalpha():
276 chars.append(c)
277 elif c not in unwanted_characters:
278 chars.append(c)
279 clean_string = ' '.join(''.join(chars).split())
280 clean_string = html.unescape(clean_string)
281 clean_string = clean_string.translate(_HYPHEN_TRANS)
282 return clean_string
285def _normalize_ra_name(raw_name: str) -> str:
286 """Normalize a RA name into one of: '', 'Full Name', 'Last, First', 'Last, '.
288 Returns '' when the name is absent, literally 'Not Available', or a
289 comma-separated pair whose surname is missing. Bare names are run
290 through :func:`clean_agent_name` to drop bracket / punctuation junk.
291 """
292 name = raw_name.strip()
293 if not name:
294 return ''
295 if ',' in name:
296 last, _, first = name.partition(',')
297 last = last.strip()
298 first = first.strip()
299 if last.lower() == 'not available':
300 last = ''
301 if first.lower() == 'not available':
302 first = ''
303 if not last:
304 return ''
305 return f'{last}, {first}' if first else f'{last}, '
306 cleaned = clean_agent_name(name)
307 if cleaned.lower() == 'not available':
308 return ''
309 return cleaned
312def clean_ra_list(ra_list: list) -> list:
313 '''
314 Clean a list of responsible agents: normalize names, drop 'Not Available'
315 entries, remove duplicates that share a name and at least one id, and
316 strip identifiers that appear under more than one agent.
318 :returns: list -- The cleaned responsible agents' list
319 '''
321 # Phase 1: parse each entry into (key, name, ids). The key groups entries
322 # that belong to the same id bucket: named entries by their normalized
323 # name, nameless (ids-only) entries by the raw input so each stays
324 # distinct.
325 parsed: list[tuple[str, str, list[str]]] = []
326 agents_ids: OrderedDict[str, OrderedDict[str, None]] = OrderedDict()
327 for ra in ra_list:
328 raw_name, ids_str = split_name_and_ids(ra)
329 name = _normalize_ra_name(raw_name)
330 ids = ids_str.split()
331 if not name and not ids:
332 continue
333 key = name or ra
334 parsed.append((key, name, ids))
335 if ids:
336 agents_ids.setdefault(key, OrderedDict()).update(
337 OrderedDict.fromkeys(ids)
338 )
340 # Phase 2: identifiers bucketed under more than one key are shared and
341 # must be dropped — they cannot unambiguously identify a single agent.
342 id_occurrences: dict[str, int] = {}
343 for bucket in agents_ids.values():
344 for identifier in bucket:
345 id_occurrences[identifier] = id_occurrences.get(identifier, 0) + 1
346 shared_ids = {i for i, count in id_occurrences.items() if count > 1}
348 # Phase 3: emit cleaned entries in input order, dropping later duplicates
349 # that share at least one surviving id with a previous entry of the same
350 # name.
351 output: list[str] = []
352 seen_ids_by_name: OrderedDict[str, set[str]] = OrderedDict()
353 for _, name, ids in parsed:
354 kept_ids = [i for i in ids if i not in shared_ids]
355 kept_ids_str = ' '.join(kept_ids)
356 if not name:
357 output.append(f'[{kept_ids_str}]')
358 continue
359 kept_set = set(kept_ids)
360 if name in seen_ids_by_name and seen_ids_by_name[name] & kept_set:
361 continue
362 seen_ids_by_name.setdefault(name, set()).update(kept_set)
363 output.append(f'{name} [{kept_ids_str}]' if kept_ids else name)
364 return output
367def normalize_id(string: str) -> Union[str, None]:
368 '''
369 This function verifies and normalizes identifiers whose schema corresponds to a DOI, an ISSN, an ISBN or an ORCID.
371 :returns: Union[str, None] -- The normalized identifier if it is valid, None otherwise
372 '''
373 identifier = string.split(':', 1)
374 schema = identifier[0].lower()
375 value = identifier[1]
376 if schema == 'doi':
377 valid_id = _DOI_MANAGER.normalise(value, include_prefix=True) if _DOI_MANAGER.syntax_ok(value) else None
378 elif schema == 'isbn':
379 valid_id = _ISBN_MANAGER.normalise(value, include_prefix=True) if _ISBN_MANAGER.is_valid(value, get_extra_info=False) else None
380 elif schema == 'issn':
381 if value == '0000-0000':
382 valid_id = None
383 else:
384 try:
385 valid_id = _ISSN_MANAGER.normalise(value, include_prefix=True) if _ISSN_MANAGER.is_valid(value, get_extra_info=False) else None
386 except ValueError:
387 print(value)
388 raise(ValueError)
389 elif schema == 'orcid':
390 valid_id = _ORCID_MANAGER.normalise(value, include_prefix=True) if _ORCID_MANAGER.is_valid(value, get_extra_info=False) else None
391 else:
392 valid_id = f'{schema}:{value}'
393 return valid_id
396def clean_volume_and_issue(row: dict) -> None:
397 output = {'volume': '', 'issue': '', 'pub_date': ''}
398 for field in {'volume', 'issue'}:
399 vi = row[field]
400 vi = normalize_hyphens(vi)
401 vi = normalize_spaces(vi).strip()
402 vi = html.unescape(vi)
403 for compiled_pattern, strategy in RE_INVALID_VI_PATTERNS.items():
404 capturing_groups = compiled_pattern.search(vi)
405 if capturing_groups:
406 if strategy == 'del':
407 row[field] = ''
408 elif strategy == 'do_nothing':
409 row[field] = vi
410 elif strategy == 's)':
411 row[field] = f'{vi}s)'
412 else:
413 row[field] = ''
414 whatever, volume, issue, pub_date = _fix_invalid_vi(capturing_groups, strategy)
415 row[field] = whatever if whatever else row[field]
416 output['volume'] = volume if volume else ''
417 output['issue'] = issue if issue else ''
418 output['pub_date'] = pub_date if pub_date else ''
419 row['volume'] = output['volume'] if not row['volume'] else row['volume']
420 row['issue'] = output['issue'] if not row['issue'] else row['issue']
421 row['pub_date'] = output['pub_date'] if not row['pub_date'] else row['pub_date']
422 switch_vi = {'volume': '', 'issue': ''}
423 for field in {'volume', 'issue'}:
424 vi = row[field]
425 for compiled_pattern in RE_VOLUMES_VALID_PATTERNS:
426 if compiled_pattern.search(vi):
427 if field == 'issue':
428 switch_vi['volume'] = vi
429 for compiled_pattern in RE_ISSUES_VALID_PATTERNS:
430 if compiled_pattern.search(vi):
431 if field == 'volume':
432 switch_vi['issue'] = vi
433 if switch_vi['volume'] and switch_vi['issue']:
434 row['volume'] = switch_vi['volume']
435 row['issue'] = switch_vi['issue']
436 elif switch_vi['volume'] and not row['volume']:
437 row['volume'] = switch_vi['volume']
438 row['issue'] = ''
439 row['type'] = 'journal volume' if row['type'] == 'journal issue' else row['type']
440 elif switch_vi['issue'] and not row['issue']:
441 row['issue'] = switch_vi['issue']
442 row['volume'] = ''
443 row['type'] = 'journal issue' if row['type'] == 'journal volume' else row['type']
446def _fix_invalid_vi(capturing_groups: re.Match, strategy: str) -> Tuple[str | None, str | None, str | None, str | None]:
447 vol_group = 1 if 'vol_iss' in strategy else 2
448 iss_group = 1 if 'iss_vol' in strategy else 2
449 whatever = None
450 volume = None
451 issue = None
452 pub_date = None
453 if 'vol' in strategy and 'iss' in strategy:
454 volume = capturing_groups.group(vol_group)
455 issue = capturing_groups.group(iss_group)
456 if 'year' in strategy:
457 pub_date = capturing_groups.group(3)
458 elif strategy == 'all':
459 whatever = capturing_groups.group(1)
460 elif strategy == 'sep':
461 first = capturing_groups.group(1)
462 second = capturing_groups.group(2)
463 whatever = f'{first}-{second}'
464 return whatever, volume, issue, pub_date
467def remove_ascii(string: str) -> str:
468 clean_string = string.translate(_ASCII_CONTROL_TRANS)
469 return ' '.join(clean_string.split())