Coverage for oc_meta/lib/cleaner.py: 96%

259 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright 2019 Silvio Peroni <essepuntato@gmail.com> 

4# Copyright 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com> 

5# Copyright 2021 Simone Persiani <iosonopersia@gmail.com> 

6# Copyright 2021-2022 Arcangelo Massari <arcangelo.massari@unibo.it> 

7# 

8# Permission to use, copy, modify, and/or distribute this software for any purpose 

9# with or without fee is hereby granted, provided that the above copyright notice 

10# and this permission notice appear in all copies. 

11# 

12# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

13# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

14# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

15# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

16# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

17# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

18# SOFTWARE. 

19 

20import html 

21import re 

22from collections import OrderedDict 

23from datetime import datetime 

24from typing import Tuple, Union 

25 

26from dateutil.parser import parse 

27from oc_ds_converter.oc_idmanager import (DOIManager, ISBNManager, ISSNManager, 

28 ORCIDManager) 

29 

30from oc_meta.lib.master_of_regex import * 

31from oc_meta.lib.master_of_regex import (invalid_vi_patterns, 

32 issues_valid_patterns, 

33 volumes_valid_patterns) 

34 

35 

36class Cleaner: 

37 def __init__(self, string:str): 

38 ''' 

39 :params string: the string to be cleaned. 

40 :type string: str 

41 ''' 

42 self.string = string 

43 

44 def normalize_hyphens(self) -> str: 

45 ''' 

46 It replaces any hyphen, dash and minus sign with a hyphen-minus character. 

47 This is done for pages, IDs and dates. 

48 

49 .. list-table:: Comparison between the various characters similar to hyphen-minus 

50 :widths: 25 25 50 

51 :header-rows: 1 

52 

53 * - UTF-8 

54 - SIGN 

55 - NAME 

56 * - U+002D 

57 - - 

58 - Hyphen-minus 

59 * - U+00AD 

60 - ­ 

61 - Soft hyphen 

62 * - U+06D4 

63 - ۔ 

64 - Arabic Full Stop 

65 * - U+2010 

66 - ‐ 

67 - Hyphen 

68 * - U+2011 

69 - − 

70 - Non-breaking Hyphen 

71 * - U+2012 

72 - – 

73 - Figure Dash 

74 * - U+2013 

75 - – 

76 - En-Dash 

77 * - U+2014 

78 - — 

79 - Em-Dash 

80 * - U+2043 

81 - ⁃ 

82 - Hyphen Bullet 

83 * - U+2212 

84 - − 

85 - Minus Sign 

86 * - U+2796 

87 - ➖ 

88 - Heavy Minus Sign 

89 * - U+2CBA 

90 - Ⲻ 

91 - Coptic Capital Letter Dialect-p Ni 

92 * - U+FE58 

93 - ﹘ 

94 - Small Em Dash 

95 

96 :returns: str -- the string with normalized hyphens 

97 ''' 

98 string = self.string 

99 wrong_characters = {'\u00AD', '\u06D4', '\u2010', '\u2011', '\u2012', '\u2013', '\u2014', '\u2043', '\u2212', '\u2796', '\u2CBA', '\uFE58'} 

100 for c in wrong_characters: 

101 string = string.replace(c, '\u002D') 

102 if 'isbn:' in string: 

103 string.replace(u'\u002D', '') 

104 return string 

105 

106 def normalize_spaces(self) -> str: 

107 ''' 

108 It replaces any ambiguous spaces with a space. 

109 

110 .. list-table:: List of the various characters similar to the space 

111 :widths: 25 25 50 

112 :header-rows: 1 

113 

114 * - UTF-8 

115 - NAME 

116 * - U+0020 

117 - Space 

118 * - U+0009 

119 - Character Tabulation 

120 * - U+00A0 

121 - No-break space 

122 * - U+200B 

123 - Zero width space 

124 * - U+202F 

125 - Narrow no-break space 

126 * - U+2003 

127 - Em Space 

128 * - U+2005 

129 - Four-Per-Em Space 

130 * - U+2009 

131 - Thin Space 

132 

133 :returns: str -- the string with normalized spaces 

134 ''' 

135 string = self.string 

136 wrong_characters = {'\u0009', '\u00A0', '&nbsp;', '\u200B', '\u202F', '\u2003', '\u2005', '\u2009'} 

137 for c in wrong_characters: 

138 string = string.replace(c, '\u0020') 

139 return string 

140 

141 def clean_title(self, normalize_titles:bool=True) -> str: 

142 ''' 

143 Concerning titles of bibliographic resources ('venue' and 'title' columns),  

144 every word in the title is capitalized except for those that have capitals within them  

145 (probably acronyms, e.g. 'FaBiO and CiTO'). This exception, however, does not include entirely capitalized titles.  

146 Finally, null characters and spaces are removed. 

147 

148 :returns: str -- The cleaned title 

149 ''' 

150 title = self.string 

151 if normalize_titles: 

152 if title.isupper(): 

153 title = title.lower() 

154 words = title.split() 

155 for i, w in enumerate(words): 

156 if not any(x.isupper() for x in w): 

157 words[i] = w.title() 

158 new_title = ' '.join(words) 

159 else: 

160 new_title = title 

161 return new_title 

162 

163 def __date_parse_hack(self, date:str) -> datetime: 

164 dt = parse(date, default=datetime(2001, 1, 1)) 

165 dt2 = parse(date, default=datetime(2002, 2, 2)) 

166 

167 if dt.year == dt2.year and dt.month == dt2.month and dt.day == dt2.day: 

168 clean_date = parse(date).strftime('%Y-%m-%d') 

169 elif dt.year == dt2.year and dt.month == dt2.month: 

170 clean_date = parse(date).strftime('%Y-%m') 

171 elif dt.year == dt2.year: 

172 clean_date = parse(date).strftime('%Y') 

173 else: 

174 clean_date = '' 

175 return clean_date 

176 

177 def clean_date(self) -> str: 

178 ''' 

179 It tries to parse a date-string into a datetime object,  

180 considering both the validity of the format (YYYYY-MM-DD) and the value (e.g. 30 February is not a valid date).  

181 For example, a date 2020-02-30 will become 2020-02, because the day is invalid.  

182 On the other hand, 2020-27-12 will become 2020 since the day 

183 and month are invalid.  

184 If the year is not valid (e.g.year >9999) data would be totally discarded. 

185 

186 :returns: str -- The cleaned date or an empty string 

187 ''' 

188 date = self.string 

189 try: 

190 date = self.__date_parse_hack(date) 

191 except ValueError: 

192 try: 

193 # e.g. 2021-12-17 

194 if len(date) == 10: 

195 try: 

196 # Maybe only the day is invalid, try year-month 

197 new_date = date[:-3] 

198 date = self.__date_parse_hack(new_date) 

199 except ValueError: 

200 try: 

201 # Maybe only the month is invalid, try year 

202 new_date = date[:-6] 

203 date = self.__date_parse_hack(new_date) 

204 except ValueError: 

205 date = '' 

206 # e.g. 2021-12 

207 elif len(date) == 7: 

208 # Maybe only the month is invalid, try year 

209 try: 

210 new_date = date[:-3] 

211 date = self.__date_parse_hack(new_date) 

212 except ValueError: 

213 date = '' 

214 else: 

215 date = '' 

216 except ValueError: 

217 date = '' 

218 return date 

219 

220 def clean_name(self) -> str: 

221 ''' 

222 The first letter of each element of the name is capitalized and superfluous spaces are removed. 

223 

224 :returns: str -- The cleaned name 

225 ''' 

226 name = self.string 

227 if ',' in name: 

228 split_name = re.split(comma_and_spaces, name) 

229 first_name = split_name[1].split() 

230 for i, w in enumerate(first_name): 

231 first_name[i] = Cleaner(w).clean_title() 

232 new_first_name = ' '.join(first_name) 

233 surname = split_name[0].split() 

234 for i, w in enumerate(surname): 

235 surname[i] = Cleaner(w).clean_title() 

236 new_surname = ' '.join(surname) 

237 if new_surname: 

238 new_name = new_surname + ', ' + new_first_name 

239 else: 

240 new_name = '' 

241 else: 

242 split_name = name.split() 

243 for i, w in enumerate(split_name): 

244 split_name[i] = Cleaner(w).clean_title() 

245 new_name = ' '.join(split_name) 

246 return new_name 

247 

248 def remove_unwanted_characters(self) -> str: 

249 ''' 

250 This method helps remove unwanted characters from a string.  

251 Such characters are all characters other than letters, numbers, space, '&', apostrophe, or dots that are not preceded by letters.  

252 Numbers and '&' are significant if the author is an organization and not a person. 

253 Finally, hyphens are normalized, Unicode encodings decoded, and extra spaces removed. 

254 

255 :param string: str -- The string to clean. 

256 :returns: str -- The cleaned string. 

257 ''' 

258 unwanted_characters = {'[', ']', ';', '?'} 

259 clean_string = str() 

260 for i, c in enumerate(self.string): 

261 if c == '.': 

262 if self.string[i-1].isalpha(): 

263 clean_string += c 

264 elif c not in unwanted_characters: 

265 clean_string += c 

266 clean_string = ' '.join(clean_string.split()).strip() 

267 clean_string = html.unescape(clean_string) 

268 clean_string = Cleaner(clean_string).normalize_hyphens() 

269 return clean_string 

270 

271 @staticmethod 

272 def clean_ra_list(ra_list:list) -> list: 

273 ''' 

274 This method removes responsible agents reported as 'Not Available', duplicates with the same name and at least one matching identifier, and common identifiers among different names. 

275 

276 :returns: list -- The cleaned responsible agents' list 

277 ''' 

278 

279 # Step 1: Collect all identifiers for each unique name 

280 agents_ids = OrderedDict() 

281 for ra in ra_list: 

282 if ra.lower().strip() == 'not available': 

283 continue 

284 match = re.compile(name_and_ids).match(ra) 

285 if match: 

286 name, ids = match.groups() 

287 if name: 

288 cleaner = Cleaner(name) 

289 cleaned_name = cleaner.remove_unwanted_characters() 

290 agents_ids.setdefault(cleaned_name, OrderedDict()).update( 

291 OrderedDict.fromkeys(ids.split())) 

292 else: # If there are only IDs, treat the whole string as an identifier 

293 agents_ids.setdefault(ra, OrderedDict()).update( 

294 OrderedDict.fromkeys(ids.split())) 

295 # Step 2: Find identifiers that are shared between different names 

296 shared_ids = set() 

297 for name, ids in agents_ids.items(): 

298 for other_name, other_ids in agents_ids.items(): 

299 if name != other_name: 

300 shared_ids.update(ids.keys() & other_ids.keys()) 

301 

302 # Step 3: Remove shared identifiers from the responsible agents 

303 for name, ids in agents_ids.items(): 

304 agents_ids[name] = OrderedDict((id_key, None) for id_key in ids if id_key not in shared_ids) 

305 

306 # Step 4: Clean the list from 'Not Available', duplicates, and shared identifiers 

307 new_ra_list = [] 

308 seen_agents = OrderedDict() 

309 for ra in ra_list: 

310 if ra.lower().strip() == 'not available': 

311 continue 

312 if ',' in ra: 

313 split_name = re.split(comma_and_spaces, ra) 

314 first_name = split_name[1].strip() if split_name[1].strip().lower() != 'not available' else '' 

315 last_name = split_name[0].strip() if split_name[0].strip().lower() != 'not available' else '' 

316 if not last_name: 

317 continue 

318 ra_cleaned_name = f'{last_name}, {first_name}' if first_name else f'{last_name}, ' 

319 else: 

320 ra_cleaned_name = ra 

321 match = re.compile(name_and_ids).match(ra) 

322 if match: 

323 name, ids = match.groups() 

324 if name: 

325 cleaner = Cleaner(name) 

326 cleaned_name = cleaner.remove_unwanted_characters() 

327 cleaned_ids = ' '.join(agents_ids.get(cleaned_name, [])) 

328 cleaned_ids_set = set(cleaned_ids.split()) 

329 ra_cleaned = f'{cleaned_name} [{cleaned_ids}]' if cleaned_ids else cleaned_name 

330 if cleaned_name in seen_agents and seen_agents[cleaned_name] & cleaned_ids_set: 

331 continue # Skip adding this ra since it's a duplicate with a matching identifier 

332 seen_agents.setdefault(cleaned_name, set()).update(cleaned_ids_set) 

333 else: 

334 cleaned_ids = [identifier for identifier in ids.split() if identifier not in shared_ids] 

335 ra_cleaned = f"[{' '.join(cleaned_ids)}]" 

336 new_ra_list.append(ra_cleaned) 

337 else: 

338 new_ra_list.append(ra_cleaned_name) 

339 return new_ra_list 

340 

341 def normalize_id(self, valid_dois_cache:dict=dict()) -> Union[str, None]: 

342 ''' 

343 This function verifies and normalizes identifiers whose schema corresponds to a DOI, an ISSN, an ISBN or an ORCID. 

344 

345 :returns: Union[str, None] -- The normalized identifier if it is valid, None otherwise 

346 ''' 

347 identifier = self.string.split(':', 1) 

348 schema = identifier[0].lower() 

349 value = identifier[1] 

350 if schema == 'doi': 

351 doi_manager = DOIManager(use_api_service=False, storage_manager=None) 

352 valid_id = doi_manager.normalise(value, include_prefix=True) if doi_manager.syntax_ok(value) else None 

353 elif schema == 'isbn': 

354 isbn_manager = ISBNManager() 

355 valid_id = isbn_manager.normalise(value, include_prefix=True) if isbn_manager.is_valid(value, get_extra_info=False) else None 

356 elif schema == 'issn': 

357 if value == '0000-0000': 

358 valid_id = None 

359 else: 

360 issn_manager = ISSNManager() 

361 try: 

362 valid_id = issn_manager.normalise(value, include_prefix=True) if issn_manager.is_valid(value, get_extra_info=False) else None 

363 except ValueError: 

364 print(value) 

365 raise(ValueError) 

366 elif schema == 'orcid': 

367 orcid_manager = ORCIDManager(use_api_service=False, storage_manager=None) 

368 valid_id = orcid_manager.normalise(value, include_prefix=True) if orcid_manager.is_valid(value, get_extra_info=False) else None 

369 else: 

370 valid_id = f'{schema}:{value}' 

371 return valid_id 

372 

373 @classmethod 

374 def clean_volume_and_issue(cls, row:dict) -> None: 

375 output = {'volume': '', 'issue': '', 'pub_date': ''} 

376 for field in {'volume', 'issue'}: 

377 vi = row[field] 

378 vi = Cleaner(vi).normalize_hyphens() 

379 vi = Cleaner(vi).normalize_spaces().strip() 

380 vi = html.unescape(vi) 

381 for pattern, strategy in invalid_vi_patterns.items(): 

382 pattern = f'^{pattern}$' 

383 capturing_groups = re.search(pattern, vi, re.IGNORECASE) 

384 if capturing_groups: 

385 if strategy == 'del': 

386 row[field] = '' 

387 elif strategy == 'do_nothing': 

388 row[field] = vi 

389 elif strategy == 's)': 

390 row[field] = f'{vi}s)' 

391 else: 

392 row[field] = '' 

393 whatever, volume, issue, pub_date = cls.fix_invalid_vi(capturing_groups, strategy) 

394 row[field] = whatever if whatever else row[field] 

395 output['volume'] = volume if volume else '' 

396 output['issue'] = issue if issue else '' 

397 output['pub_date'] = pub_date if pub_date else '' 

398 row['volume'] = output['volume'] if not row['volume'] else row['volume'] 

399 row['issue'] = output['issue'] if not row['issue'] else row['issue'] 

400 row['pub_date'] = output['pub_date'] if not row['pub_date'] else row['pub_date'] 

401 switch_vi = {'volume': '', 'issue': ''} 

402 for field in {'volume', 'issue'}: 

403 vi = row[field] 

404 for pattern in volumes_valid_patterns: 

405 pattern = f'^{pattern}$' 

406 if re.search(pattern, vi, re.IGNORECASE): 

407 if field == 'issue': 

408 switch_vi['volume'] = vi 

409 for pattern in issues_valid_patterns: 

410 pattern = f'^{pattern}$' 

411 if re.search(pattern, vi, re.IGNORECASE): 

412 if field == 'volume': 

413 switch_vi['issue'] = vi 

414 if switch_vi['volume'] and switch_vi['issue']: 

415 row['volume'] = switch_vi['volume'] 

416 row['issue'] = switch_vi['issue'] 

417 elif switch_vi['volume'] and not row['volume']: 

418 row['volume'] = switch_vi['volume'] 

419 row['issue'] = '' 

420 row['type'] = 'journal volume' if row['type'] == 'journal issue' else row['type'] 

421 elif switch_vi['issue'] and not row['issue']: 

422 row['issue'] = switch_vi['issue'] 

423 row['volume'] = '' 

424 row['type'] = 'journal issue' if row['type'] == 'journal volume' else row['type'] 

425 

426 @staticmethod 

427 def fix_invalid_vi(capturing_groups:re.Match, strategy:str) -> Tuple[str, str, str, str]: 

428 vol_group = 1 if 'vol_iss' in strategy else 2 

429 iss_group = 1 if 'iss_vol' in strategy else 2 

430 whatever = None 

431 volume = None 

432 issue = None 

433 pub_date = None 

434 if 'vol' in strategy and 'iss' in strategy: 

435 volume = capturing_groups.group(vol_group) 

436 issue = capturing_groups.group(iss_group) 

437 if 'year' in strategy: 

438 pub_date = capturing_groups.group(3) 

439 elif strategy == 'all': 

440 whatever = capturing_groups.group(1) 

441 elif strategy == 'sep': 

442 first = capturing_groups.group(1) 

443 second = capturing_groups.group(2) 

444 whatever = f'{first}-{second}' 

445 return whatever, volume, issue, pub_date 

446 

447 def remove_ascii(self): 

448 unwanted_chars = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff' 

449 clean_string = ''.join([' ' if c in unwanted_chars else c for c in self.string]) 

450 clean_string = ' '.join(clean_string.split()) 

451 return clean_string