Coverage for oc_meta/lib/cleaner.py: 96%
259 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright 2019 Silvio Peroni <essepuntato@gmail.com>
4# Copyright 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
5# Copyright 2021 Simone Persiani <iosonopersia@gmail.com>
6# Copyright 2021-2022 Arcangelo Massari <arcangelo.massari@unibo.it>
7#
8# Permission to use, copy, modify, and/or distribute this software for any purpose
9# with or without fee is hereby granted, provided that the above copyright notice
10# and this permission notice appear in all copies.
11#
12# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
13# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
14# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
15# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
16# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
18# SOFTWARE.
20import html
21import re
22from collections import OrderedDict
23from datetime import datetime
24from typing import Tuple, Union
26from dateutil.parser import parse
27from oc_ds_converter.oc_idmanager import (DOIManager, ISBNManager, ISSNManager,
28 ORCIDManager)
30from oc_meta.lib.master_of_regex import *
31from oc_meta.lib.master_of_regex import (invalid_vi_patterns,
32 issues_valid_patterns,
33 volumes_valid_patterns)
36class Cleaner:
37 def __init__(self, string:str):
38 '''
39 :params string: the string to be cleaned.
40 :type string: str
41 '''
42 self.string = string
44 def normalize_hyphens(self) -> str:
45 '''
46 It replaces any hyphen, dash and minus sign with a hyphen-minus character.
47 This is done for pages, IDs and dates.
49 .. list-table:: Comparison between the various characters similar to hyphen-minus
50 :widths: 25 25 50
51 :header-rows: 1
53 * - UTF-8
54 - SIGN
55 - NAME
56 * - U+002D
57 - -
58 - Hyphen-minus
59 * - U+00AD
60 -
61 - Soft hyphen
62 * - U+06D4
63 - ۔
64 - Arabic Full Stop
65 * - U+2010
66 - ‐
67 - Hyphen
68 * - U+2011
69 - −
70 - Non-breaking Hyphen
71 * - U+2012
72 - –
73 - Figure Dash
74 * - U+2013
75 - –
76 - En-Dash
77 * - U+2014
78 - —
79 - Em-Dash
80 * - U+2043
81 - ⁃
82 - Hyphen Bullet
83 * - U+2212
84 - −
85 - Minus Sign
86 * - U+2796
87 - ➖
88 - Heavy Minus Sign
89 * - U+2CBA
90 - Ⲻ
91 - Coptic Capital Letter Dialect-p Ni
92 * - U+FE58
93 - ﹘
94 - Small Em Dash
96 :returns: str -- the string with normalized hyphens
97 '''
98 string = self.string
99 wrong_characters = {'\u00AD', '\u06D4', '\u2010', '\u2011', '\u2012', '\u2013', '\u2014', '\u2043', '\u2212', '\u2796', '\u2CBA', '\uFE58'}
100 for c in wrong_characters:
101 string = string.replace(c, '\u002D')
102 if 'isbn:' in string:
103 string.replace(u'\u002D', '')
104 return string
106 def normalize_spaces(self) -> str:
107 '''
108 It replaces any ambiguous spaces with a space.
110 .. list-table:: List of the various characters similar to the space
111 :widths: 25 25 50
112 :header-rows: 1
114 * - UTF-8
115 - NAME
116 * - U+0020
117 - Space
118 * - U+0009
119 - Character Tabulation
120 * - U+00A0
121 - No-break space
122 * - U+200B
123 - Zero width space
124 * - U+202F
125 - Narrow no-break space
126 * - U+2003
127 - Em Space
128 * - U+2005
129 - Four-Per-Em Space
130 * - U+2009
131 - Thin Space
133 :returns: str -- the string with normalized spaces
134 '''
135 string = self.string
136 wrong_characters = {'\u0009', '\u00A0', ' ', '\u200B', '\u202F', '\u2003', '\u2005', '\u2009'}
137 for c in wrong_characters:
138 string = string.replace(c, '\u0020')
139 return string
141 def clean_title(self, normalize_titles:bool=True) -> str:
142 '''
143 Concerning titles of bibliographic resources ('venue' and 'title' columns),
144 every word in the title is capitalized except for those that have capitals within them
145 (probably acronyms, e.g. 'FaBiO and CiTO'). This exception, however, does not include entirely capitalized titles.
146 Finally, null characters and spaces are removed.
148 :returns: str -- The cleaned title
149 '''
150 title = self.string
151 if normalize_titles:
152 if title.isupper():
153 title = title.lower()
154 words = title.split()
155 for i, w in enumerate(words):
156 if not any(x.isupper() for x in w):
157 words[i] = w.title()
158 new_title = ' '.join(words)
159 else:
160 new_title = title
161 return new_title
163 def __date_parse_hack(self, date:str) -> datetime:
164 dt = parse(date, default=datetime(2001, 1, 1))
165 dt2 = parse(date, default=datetime(2002, 2, 2))
167 if dt.year == dt2.year and dt.month == dt2.month and dt.day == dt2.day:
168 clean_date = parse(date).strftime('%Y-%m-%d')
169 elif dt.year == dt2.year and dt.month == dt2.month:
170 clean_date = parse(date).strftime('%Y-%m')
171 elif dt.year == dt2.year:
172 clean_date = parse(date).strftime('%Y')
173 else:
174 clean_date = ''
175 return clean_date
177 def clean_date(self) -> str:
178 '''
179 It tries to parse a date-string into a datetime object,
180 considering both the validity of the format (YYYYY-MM-DD) and the value (e.g. 30 February is not a valid date).
181 For example, a date 2020-02-30 will become 2020-02, because the day is invalid.
182 On the other hand, 2020-27-12 will become 2020 since the day
183 and month are invalid.
184 If the year is not valid (e.g.year >9999) data would be totally discarded.
186 :returns: str -- The cleaned date or an empty string
187 '''
188 date = self.string
189 try:
190 date = self.__date_parse_hack(date)
191 except ValueError:
192 try:
193 # e.g. 2021-12-17
194 if len(date) == 10:
195 try:
196 # Maybe only the day is invalid, try year-month
197 new_date = date[:-3]
198 date = self.__date_parse_hack(new_date)
199 except ValueError:
200 try:
201 # Maybe only the month is invalid, try year
202 new_date = date[:-6]
203 date = self.__date_parse_hack(new_date)
204 except ValueError:
205 date = ''
206 # e.g. 2021-12
207 elif len(date) == 7:
208 # Maybe only the month is invalid, try year
209 try:
210 new_date = date[:-3]
211 date = self.__date_parse_hack(new_date)
212 except ValueError:
213 date = ''
214 else:
215 date = ''
216 except ValueError:
217 date = ''
218 return date
220 def clean_name(self) -> str:
221 '''
222 The first letter of each element of the name is capitalized and superfluous spaces are removed.
224 :returns: str -- The cleaned name
225 '''
226 name = self.string
227 if ',' in name:
228 split_name = re.split(comma_and_spaces, name)
229 first_name = split_name[1].split()
230 for i, w in enumerate(first_name):
231 first_name[i] = Cleaner(w).clean_title()
232 new_first_name = ' '.join(first_name)
233 surname = split_name[0].split()
234 for i, w in enumerate(surname):
235 surname[i] = Cleaner(w).clean_title()
236 new_surname = ' '.join(surname)
237 if new_surname:
238 new_name = new_surname + ', ' + new_first_name
239 else:
240 new_name = ''
241 else:
242 split_name = name.split()
243 for i, w in enumerate(split_name):
244 split_name[i] = Cleaner(w).clean_title()
245 new_name = ' '.join(split_name)
246 return new_name
248 def remove_unwanted_characters(self) -> str:
249 '''
250 This method helps remove unwanted characters from a string.
251 Such characters are all characters other than letters, numbers, space, '&', apostrophe, or dots that are not preceded by letters.
252 Numbers and '&' are significant if the author is an organization and not a person.
253 Finally, hyphens are normalized, Unicode encodings decoded, and extra spaces removed.
255 :param string: str -- The string to clean.
256 :returns: str -- The cleaned string.
257 '''
258 unwanted_characters = {'[', ']', ';', '?'}
259 clean_string = str()
260 for i, c in enumerate(self.string):
261 if c == '.':
262 if self.string[i-1].isalpha():
263 clean_string += c
264 elif c not in unwanted_characters:
265 clean_string += c
266 clean_string = ' '.join(clean_string.split()).strip()
267 clean_string = html.unescape(clean_string)
268 clean_string = Cleaner(clean_string).normalize_hyphens()
269 return clean_string
271 @staticmethod
272 def clean_ra_list(ra_list:list) -> list:
273 '''
274 This method removes responsible agents reported as 'Not Available', duplicates with the same name and at least one matching identifier, and common identifiers among different names.
276 :returns: list -- The cleaned responsible agents' list
277 '''
279 # Step 1: Collect all identifiers for each unique name
280 agents_ids = OrderedDict()
281 for ra in ra_list:
282 if ra.lower().strip() == 'not available':
283 continue
284 match = re.compile(name_and_ids).match(ra)
285 if match:
286 name, ids = match.groups()
287 if name:
288 cleaner = Cleaner(name)
289 cleaned_name = cleaner.remove_unwanted_characters()
290 agents_ids.setdefault(cleaned_name, OrderedDict()).update(
291 OrderedDict.fromkeys(ids.split()))
292 else: # If there are only IDs, treat the whole string as an identifier
293 agents_ids.setdefault(ra, OrderedDict()).update(
294 OrderedDict.fromkeys(ids.split()))
295 # Step 2: Find identifiers that are shared between different names
296 shared_ids = set()
297 for name, ids in agents_ids.items():
298 for other_name, other_ids in agents_ids.items():
299 if name != other_name:
300 shared_ids.update(ids.keys() & other_ids.keys())
302 # Step 3: Remove shared identifiers from the responsible agents
303 for name, ids in agents_ids.items():
304 agents_ids[name] = OrderedDict((id_key, None) for id_key in ids if id_key not in shared_ids)
306 # Step 4: Clean the list from 'Not Available', duplicates, and shared identifiers
307 new_ra_list = []
308 seen_agents = OrderedDict()
309 for ra in ra_list:
310 if ra.lower().strip() == 'not available':
311 continue
312 if ',' in ra:
313 split_name = re.split(comma_and_spaces, ra)
314 first_name = split_name[1].strip() if split_name[1].strip().lower() != 'not available' else ''
315 last_name = split_name[0].strip() if split_name[0].strip().lower() != 'not available' else ''
316 if not last_name:
317 continue
318 ra_cleaned_name = f'{last_name}, {first_name}' if first_name else f'{last_name}, '
319 else:
320 ra_cleaned_name = ra
321 match = re.compile(name_and_ids).match(ra)
322 if match:
323 name, ids = match.groups()
324 if name:
325 cleaner = Cleaner(name)
326 cleaned_name = cleaner.remove_unwanted_characters()
327 cleaned_ids = ' '.join(agents_ids.get(cleaned_name, []))
328 cleaned_ids_set = set(cleaned_ids.split())
329 ra_cleaned = f'{cleaned_name} [{cleaned_ids}]' if cleaned_ids else cleaned_name
330 if cleaned_name in seen_agents and seen_agents[cleaned_name] & cleaned_ids_set:
331 continue # Skip adding this ra since it's a duplicate with a matching identifier
332 seen_agents.setdefault(cleaned_name, set()).update(cleaned_ids_set)
333 else:
334 cleaned_ids = [identifier for identifier in ids.split() if identifier not in shared_ids]
335 ra_cleaned = f"[{' '.join(cleaned_ids)}]"
336 new_ra_list.append(ra_cleaned)
337 else:
338 new_ra_list.append(ra_cleaned_name)
339 return new_ra_list
341 def normalize_id(self, valid_dois_cache:dict=dict()) -> Union[str, None]:
342 '''
343 This function verifies and normalizes identifiers whose schema corresponds to a DOI, an ISSN, an ISBN or an ORCID.
345 :returns: Union[str, None] -- The normalized identifier if it is valid, None otherwise
346 '''
347 identifier = self.string.split(':', 1)
348 schema = identifier[0].lower()
349 value = identifier[1]
350 if schema == 'doi':
351 doi_manager = DOIManager(use_api_service=False, storage_manager=None)
352 valid_id = doi_manager.normalise(value, include_prefix=True) if doi_manager.syntax_ok(value) else None
353 elif schema == 'isbn':
354 isbn_manager = ISBNManager()
355 valid_id = isbn_manager.normalise(value, include_prefix=True) if isbn_manager.is_valid(value, get_extra_info=False) else None
356 elif schema == 'issn':
357 if value == '0000-0000':
358 valid_id = None
359 else:
360 issn_manager = ISSNManager()
361 try:
362 valid_id = issn_manager.normalise(value, include_prefix=True) if issn_manager.is_valid(value, get_extra_info=False) else None
363 except ValueError:
364 print(value)
365 raise(ValueError)
366 elif schema == 'orcid':
367 orcid_manager = ORCIDManager(use_api_service=False, storage_manager=None)
368 valid_id = orcid_manager.normalise(value, include_prefix=True) if orcid_manager.is_valid(value, get_extra_info=False) else None
369 else:
370 valid_id = f'{schema}:{value}'
371 return valid_id
373 @classmethod
374 def clean_volume_and_issue(cls, row:dict) -> None:
375 output = {'volume': '', 'issue': '', 'pub_date': ''}
376 for field in {'volume', 'issue'}:
377 vi = row[field]
378 vi = Cleaner(vi).normalize_hyphens()
379 vi = Cleaner(vi).normalize_spaces().strip()
380 vi = html.unescape(vi)
381 for pattern, strategy in invalid_vi_patterns.items():
382 pattern = f'^{pattern}$'
383 capturing_groups = re.search(pattern, vi, re.IGNORECASE)
384 if capturing_groups:
385 if strategy == 'del':
386 row[field] = ''
387 elif strategy == 'do_nothing':
388 row[field] = vi
389 elif strategy == 's)':
390 row[field] = f'{vi}s)'
391 else:
392 row[field] = ''
393 whatever, volume, issue, pub_date = cls.fix_invalid_vi(capturing_groups, strategy)
394 row[field] = whatever if whatever else row[field]
395 output['volume'] = volume if volume else ''
396 output['issue'] = issue if issue else ''
397 output['pub_date'] = pub_date if pub_date else ''
398 row['volume'] = output['volume'] if not row['volume'] else row['volume']
399 row['issue'] = output['issue'] if not row['issue'] else row['issue']
400 row['pub_date'] = output['pub_date'] if not row['pub_date'] else row['pub_date']
401 switch_vi = {'volume': '', 'issue': ''}
402 for field in {'volume', 'issue'}:
403 vi = row[field]
404 for pattern in volumes_valid_patterns:
405 pattern = f'^{pattern}$'
406 if re.search(pattern, vi, re.IGNORECASE):
407 if field == 'issue':
408 switch_vi['volume'] = vi
409 for pattern in issues_valid_patterns:
410 pattern = f'^{pattern}$'
411 if re.search(pattern, vi, re.IGNORECASE):
412 if field == 'volume':
413 switch_vi['issue'] = vi
414 if switch_vi['volume'] and switch_vi['issue']:
415 row['volume'] = switch_vi['volume']
416 row['issue'] = switch_vi['issue']
417 elif switch_vi['volume'] and not row['volume']:
418 row['volume'] = switch_vi['volume']
419 row['issue'] = ''
420 row['type'] = 'journal volume' if row['type'] == 'journal issue' else row['type']
421 elif switch_vi['issue'] and not row['issue']:
422 row['issue'] = switch_vi['issue']
423 row['volume'] = ''
424 row['type'] = 'journal issue' if row['type'] == 'journal volume' else row['type']
426 @staticmethod
427 def fix_invalid_vi(capturing_groups:re.Match, strategy:str) -> Tuple[str, str, str, str]:
428 vol_group = 1 if 'vol_iss' in strategy else 2
429 iss_group = 1 if 'iss_vol' in strategy else 2
430 whatever = None
431 volume = None
432 issue = None
433 pub_date = None
434 if 'vol' in strategy and 'iss' in strategy:
435 volume = capturing_groups.group(vol_group)
436 issue = capturing_groups.group(iss_group)
437 if 'year' in strategy:
438 pub_date = capturing_groups.group(3)
439 elif strategy == 'all':
440 whatever = capturing_groups.group(1)
441 elif strategy == 'sep':
442 first = capturing_groups.group(1)
443 second = capturing_groups.group(2)
444 whatever = f'{first}-{second}'
445 return whatever, volume, issue, pub_date
447 def remove_ascii(self):
448 unwanted_chars = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff'
449 clean_string = ''.join([' ' if c in unwanted_chars else c for c in self.string])
450 clean_string = ' '.join(clean_string.split())
451 return clean_string