Coverage for oc_ds_converter / lib / cleaner.py: 20%
218 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2019 Silvio Peroni <essepuntato@gmail.com>
2# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
3# SPDX-FileCopyrightText: 2021 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8import html
9import re
10from datetime import datetime
11from typing import Tuple, Union
13from dateutil.parser import parse
14from oc_ds_converter.oc_idmanager import DOIManager, ISBNManager, ISSNManager, ORCIDManager
16from oc_ds_converter.lib.master_of_regex import (
17 comma_and_spaces,
18 invalid_vi_patterns,
19 issues_valid_patterns,
20 volumes_valid_patterns,
21)
24class Cleaner:
25 def __init__(self, string:str):
26 '''
27 :params string: the string to be cleaned.
28 :type string: str
29 '''
30 self.string = string
32 def normalize_hyphens(self) -> str:
33 '''
34 It replaces any hyphen, dash and minus sign with a hyphen-minus character.
35 This is done for pages, IDs and dates.
37 .. list-table:: Comparison between the various characters similar to hyphen-minus
38 :widths: 25 25 50
39 :header-rows: 1
41 * - UTF-8
42 - SIGN
43 - NAME
44 * - U+002D
45 - -
46 - Hyphen-minus
47 * - U+00AD
48 -
49 - Soft hyphen
50 * - U+06D4
51 - ۔
52 - Arabic Full Stop
53 * - U+2010
54 - ‐
55 - Hyphen
56 * - U+2011
57 - −
58 - Non-breaking Hyphen
59 * - U+2012
60 - –
61 - Figure Dash
62 * - U+2013
63 - –
64 - En-Dash
65 * - U+2014
66 - —
67 - Em-Dash
68 * - U+2043
69 - ⁃
70 - Hyphen Bullet
71 * - U+2212
72 - −
73 - Minus Sign
74 * - U+2796
75 - ➖
76 - Heavy Minus Sign
77 * - U+2CBA
78 - Ⲻ
79 - Coptic Capital Letter Dialect-p Ni
80 * - U+FE58
81 - ﹘
82 - Small Em Dash
84 :returns: str -- the string with normalized hyphens
85 '''
86 string = self.string
87 wrong_characters = {'\u00AD', '\u06D4', '\u2010', '\u2011', '\u2012', '\u2013', '\u2014', '\u2043', '\u2212', '\u2796', '\u2CBA', '\uFE58'}
88 for c in wrong_characters:
89 string = string.replace(c, '\u002D')
90 if 'isbn:' in string:
91 string.replace(u'\u002D', '')
92 return string
94 def normalize_spaces(self) -> str:
95 '''
96 It replaces any ambiguous spaces with a space.
98 .. list-table:: List of the various characters similar to the space
99 :widths: 25 25 50
100 :header-rows: 1
102 * - UTF-8
103 - NAME
104 * - U+0020
105 - Space
106 * - U+0009
107 - Character Tabulation
108 * - U+00A0
109 - No-break space
110 * - U+200B
111 - Zero width space
112 * - U+202F
113 - Narrow no-break space
114 * - U+2003
115 - Em Space
116 * - U+2005
117 - Four-Per-Em Space
118 * - U+2009
119 - Thin Space
121 :returns: str -- the string with normalized spaces
122 '''
123 string = self.string
124 wrong_characters = {'\u0009', '\u00A0', ' ', '\u200B', '\u202F', '\u2003', '\u2005', '\u2009'}
125 for c in wrong_characters:
126 string = string.replace(c, '\u0020')
127 return string
129 def clean_title(self) -> str:
130 '''
131 Concerning titles of bibliographic resources ('venue' and 'title' columns),
132 every word in the title is capitalized except for those that have capitals within them
133 (probably acronyms, e.g. 'FaBiO and CiTO'). This exception, however, does not include entirely capitalized titles.
134 Finally, null characters and spaces are removed.
136 :returns: str -- The cleaned title
137 '''
138 title = self.string
139 if title.isupper():
140 title = title.lower()
141 words = title.split()
142 for i, w in enumerate(words):
143 if not any(x.isupper() for x in w):
144 words[i] = w.title()
145 new_title = ' '.join(words)
146 return new_title
148 def __date_parse_hack(self, date:str) -> datetime:
149 dt = parse(date, default=datetime(2001, 1, 1))
150 dt2 = parse(date, default=datetime(2002, 2, 2))
152 if dt.year == dt2.year and dt.month == dt2.month and dt.day == dt2.day:
153 clean_date = parse(date).strftime('%Y-%m-%d')
154 elif dt.year == dt2.year and dt.month == dt2.month:
155 clean_date = parse(date).strftime('%Y-%m')
156 elif dt.year == dt2.year:
157 clean_date = parse(date).strftime('%Y')
158 else:
159 clean_date = ''
160 return clean_date
162 def clean_date(self) -> str:
163 '''
164 It tries to parse a date-string into a datetime object,
165 considering both the validity of the format (YYYYY-MM-DD) and the value (e.g. 30 February is not a valid date).
166 For example, a date 2020-02-30 will become 2020-02, because the day is invalid.
167 On the other hand, 2020-27-12 will become 2020 since the day
168 and month are invalid.
169 If the year is not valid (e.g.year >9999) data would be totally discarded.
171 :returns: str -- The cleaned date or an empty string
172 '''
173 date = self.string
174 try:
175 date = self.__date_parse_hack(date)
176 except ValueError:
177 try:
178 # e.g. 2021-12-17
179 if len(date) == 10:
180 try:
181 # Maybe only the day is invalid, try year-month
182 new_date = date[:-3]
183 date = self.__date_parse_hack(new_date)
184 except ValueError:
185 try:
186 # Maybe only the month is invalid, try year
187 new_date = date[:-6]
188 date = self.__date_parse_hack(new_date)
189 except ValueError:
190 date = ''
191 # e.g. 2021-12
192 elif len(date) == 7:
193 # Maybe only the month is invalid, try year
194 try:
195 new_date = date[:-3]
196 date = self.__date_parse_hack(new_date)
197 except ValueError:
198 date = ''
199 else:
200 date = ''
201 except ValueError:
202 date = ''
203 return date
205 def clean_name(self) -> str:
206 '''
207 The first letter of each element of the name is capitalized and superfluous spaces are removed.
209 :returns: str -- The cleaned name
210 '''
211 name = self.string
212 if ',' in name:
213 split_name = re.split(comma_and_spaces, name)
214 first_name = split_name[1].split()
215 for i, w in enumerate(first_name):
216 first_name[i] = Cleaner(w).clean_title()
217 new_first_name = ' '.join(first_name)
218 surname = split_name[0].split()
219 for i, w in enumerate(surname):
220 surname[i] = Cleaner(w).clean_title()
221 new_surname = ' '.join(surname)
222 if new_surname:
223 new_name = new_surname + ', ' + new_first_name
224 else:
225 new_name = ''
226 else:
227 split_name = name.split()
228 for i, w in enumerate(split_name):
229 split_name[i] = Cleaner(w).clean_title()
230 new_name = ' '.join(split_name)
231 return new_name
233 def remove_unwanted_characters(self) -> str:
234 '''
235 This method helps remove unwanted characters from authors' names.
236 Such characters are all characters other than letters, numbers, space, '&', apostroph, or dots that are not preceded by letters.
237 Numbers and '&' are significant if the author is an organization and not a person.
238 Finally, hyphens are normalized, Unicode encodings decoded, and extra spaces removed.
240 :returns: str -- The cleaned name
241 '''
242 unwanted_characters = {'[', ']', '{', '}', '(', ')', '?', ';', ','}
243 clean_string = str()
244 for i, c in enumerate(self.string):
245 if c == '.':
246 if self.string[i-1].isalpha():
247 clean_string += c
248 elif c not in unwanted_characters:
249 clean_string += c
250 clean_string = ' '.join(clean_string.split()).strip()
251 clean_string = html.unescape(clean_string)
252 clean_string = Cleaner(clean_string).normalize_hyphens()
253 return clean_string
255 @staticmethod
256 def clean_ra_list(ra_list:list) -> list:
257 '''
258 This method removes responsible agents reported as 'Not Available'.
260 :returns: list -- The cleaned responsible agents' list
261 '''
262 new_ra_list = list()
263 for ra in ra_list:
264 if ',' in ra:
265 split_name = re.split(comma_and_spaces, ra)
266 first_name = split_name[1] if split_name[1].lower() != 'not available' else ''
267 given_name = split_name[0] if split_name[0].lower() != 'not available' else ''
268 if given_name:
269 if first_name:
270 new_ra_list.append(ra)
271 else:
272 new_ra_list.append(f'{given_name}, ')
273 else:
274 continue
275 else:
276 if ra.lower() != 'not available':
277 new_ra_list.append(ra)
278 return new_ra_list
281 def normalize_id(self, valid_dois_cache:dict=dict()) -> Union[str, None]:
282 '''
283 This function verifies and normalizes identifiers whose schema corresponds to a DOI, an ISSN, an ISBN or an ORCID.
285 :returns: Union[str, None] -- The normalized identifier if it is valid, None otherwise
286 '''
287 identifier = self.string.split(':', 1)
288 schema = identifier[0].lower()
289 value = identifier[1]
290 use_api_service = True if valid_dois_cache else False
291 validator = 'is_valid' if use_api_service else 'check_digit'
292 if schema == 'doi':
293 doi_manager = DOIManager(data=valid_dois_cache, use_api_service=use_api_service)
294 valid_id = doi_manager.normalise(value, include_prefix=True) if getattr(doi_manager, validator)(value) else None
295 elif schema == 'isbn':
296 isbn_manager = ISBNManager()
297 valid_id = isbn_manager.normalise(value, include_prefix=True) if getattr(isbn_manager, validator)(value) else None
298 elif schema == 'issn':
299 if value == '0000-0000':
300 valid_id = None
301 else:
302 issn_manager = ISSNManager()
303 valid_id = issn_manager.normalise(value, include_prefix=True) if getattr(issn_manager, validator)(value) else None
304 elif schema == 'orcid':
305 orcid_manager = ORCIDManager()
306 valid_id = orcid_manager.normalise(value, include_prefix=True) if getattr(orcid_manager, validator)(value) else None
307 else:
308 valid_id = f'{schema}:{value}'
309 return valid_id
311 @classmethod
312 def clean_volume_and_issue(cls, row:dict) -> None:
313 output = {'volume': '', 'issue': '', 'pub_date': ''}
314 for field in {'volume', 'issue'}:
315 vi = row[field]
316 vi = Cleaner(vi).normalize_hyphens()
317 vi = Cleaner(vi).normalize_spaces().strip()
318 vi = html.unescape(vi)
319 for pattern, strategy in invalid_vi_patterns.items():
320 pattern = f'^{pattern}$'
321 capturing_groups = re.search(pattern, vi, re.IGNORECASE)
322 if capturing_groups:
323 if strategy == 'del':
324 row[field] = ''
325 elif strategy == 'do_nothing':
326 row[field] = vi
327 elif strategy == 's)':
328 row[field] = f'{vi}s)'
329 else:
330 row[field] = ''
331 whatever, volume, issue, pub_date = cls.fix_invalid_vi(capturing_groups, strategy)
332 row[field] = whatever if whatever else row[field]
333 output['volume'] = volume if volume else ''
334 output['issue'] = issue if issue else ''
335 output['pub_date'] = pub_date if pub_date else ''
336 row['volume'] = output['volume'] if not row['volume'] else row['volume']
337 row['issue'] = output['issue'] if not row['issue'] else row['issue']
338 row['pub_date'] = output['pub_date'] if not row['pub_date'] else row['pub_date']
339 switch_vi = {'volume': '', 'issue': ''}
340 for field in {'volume', 'issue'}:
341 vi = row[field]
342 for pattern in volumes_valid_patterns:
343 pattern = f'^{pattern}$'
344 if re.search(pattern, vi, re.IGNORECASE):
345 if field == 'issue':
346 switch_vi['volume'] = vi
347 for pattern in issues_valid_patterns:
348 pattern = f'^{pattern}$'
349 if re.search(pattern, vi, re.IGNORECASE):
350 if field == 'volume':
351 switch_vi['issue'] = vi
352 if switch_vi['volume'] and switch_vi['issue']:
353 row['volume'] = switch_vi['volume']
354 row['issue'] = switch_vi['issue']
355 elif switch_vi['volume'] and not row['volume']:
356 row['volume'] = switch_vi['volume']
357 row['issue'] = ''
358 row['type'] = 'journal volume' if row['type'] == 'journal issue' else row['type']
359 elif switch_vi['issue'] and not row['issue']:
360 row['issue'] = switch_vi['issue']
361 row['volume'] = ''
362 row['type'] = 'journal issue' if row['type'] == 'journal volume' else row['type']
364 @staticmethod
365 def fix_invalid_vi(capturing_groups:re.Match, strategy:str) -> Tuple[str, str, str, str]:
366 vol_group = 1 if 'vol_iss' in strategy else 2
367 iss_group = 1 if 'iss_vol' in strategy else 2
368 whatever = None
369 volume = None
370 issue = None
371 pub_date = None
372 if 'vol' in strategy and 'iss' in strategy:
373 volume = capturing_groups.group(vol_group)
374 issue = capturing_groups.group(iss_group)
375 if 'year' in strategy:
376 pub_date = capturing_groups.group(3)
377 elif strategy == 'all':
378 whatever = capturing_groups.group(1)
379 elif strategy == 'sep':
380 first = capturing_groups.group(1)
381 second = capturing_groups.group(2)
382 whatever = f'{first}-{second}'
383 return whatever, volume, issue, pub_date
385 def remove_ascii(self):
386 unwanted_chars = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff'
387 clean_string = ''.join([' ' if c in unwanted_chars else c for c in self.string])
388 clean_string = ' '.join(clean_string.split())
389 return clean_string