Coverage for oc_ds_converter / lib / cleaner.py: 20%

218 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2019 Silvio Peroni <essepuntato@gmail.com> 

2# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com> 

3# SPDX-FileCopyrightText: 2021 Simone Persiani <iosonopersia@gmail.com> 

4# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8import html 

9import re 

10from datetime import datetime 

11from typing import Tuple, Union 

12 

13from dateutil.parser import parse 

14from oc_ds_converter.oc_idmanager import DOIManager, ISBNManager, ISSNManager, ORCIDManager 

15 

16from oc_ds_converter.lib.master_of_regex import ( 

17 comma_and_spaces, 

18 invalid_vi_patterns, 

19 issues_valid_patterns, 

20 volumes_valid_patterns, 

21) 

22 

23 

24class Cleaner: 

25 def __init__(self, string:str): 

26 ''' 

27 :params string: the string to be cleaned. 

28 :type string: str 

29 ''' 

30 self.string = string 

31 

32 def normalize_hyphens(self) -> str: 

33 ''' 

34 It replaces any hyphen, dash and minus sign with a hyphen-minus character. 

35 This is done for pages, IDs and dates. 

36 

37 .. list-table:: Comparison between the various characters similar to hyphen-minus 

38 :widths: 25 25 50 

39 :header-rows: 1 

40 

41 * - UTF-8 

42 - SIGN 

43 - NAME 

44 * - U+002D 

45 - - 

46 - Hyphen-minus 

47 * - U+00AD 

48 - ­ 

49 - Soft hyphen 

50 * - U+06D4 

51 - ۔ 

52 - Arabic Full Stop 

53 * - U+2010 

54 - ‐ 

55 - Hyphen 

56 * - U+2011 

57 - − 

58 - Non-breaking Hyphen 

59 * - U+2012 

60 - – 

61 - Figure Dash 

62 * - U+2013 

63 - – 

64 - En-Dash 

65 * - U+2014 

66 - — 

67 - Em-Dash 

68 * - U+2043 

69 - ⁃ 

70 - Hyphen Bullet 

71 * - U+2212 

72 - − 

73 - Minus Sign 

74 * - U+2796 

75 - ➖ 

76 - Heavy Minus Sign 

77 * - U+2CBA 

78 - Ⲻ 

79 - Coptic Capital Letter Dialect-p Ni 

80 * - U+FE58 

81 - ﹘ 

82 - Small Em Dash 

83 

84 :returns: str -- the string with normalized hyphens 

85 ''' 

86 string = self.string 

87 wrong_characters = {'\u00AD', '\u06D4', '\u2010', '\u2011', '\u2012', '\u2013', '\u2014', '\u2043', '\u2212', '\u2796', '\u2CBA', '\uFE58'} 

88 for c in wrong_characters: 

89 string = string.replace(c, '\u002D') 

90 if 'isbn:' in string: 

91 string.replace(u'\u002D', '') 

92 return string 

93 

94 def normalize_spaces(self) -> str: 

95 ''' 

96 It replaces any ambiguous spaces with a space. 

97 

98 .. list-table:: List of the various characters similar to the space 

99 :widths: 25 25 50 

100 :header-rows: 1 

101 

102 * - UTF-8 

103 - NAME 

104 * - U+0020 

105 - Space 

106 * - U+0009 

107 - Character Tabulation 

108 * - U+00A0 

109 - No-break space 

110 * - U+200B 

111 - Zero width space 

112 * - U+202F 

113 - Narrow no-break space 

114 * - U+2003 

115 - Em Space 

116 * - U+2005 

117 - Four-Per-Em Space 

118 * - U+2009 

119 - Thin Space 

120 

121 :returns: str -- the string with normalized spaces 

122 ''' 

123 string = self.string 

124 wrong_characters = {'\u0009', '\u00A0', '&nbsp;', '\u200B', '\u202F', '\u2003', '\u2005', '\u2009'} 

125 for c in wrong_characters: 

126 string = string.replace(c, '\u0020') 

127 return string 

128 

129 def clean_title(self) -> str: 

130 ''' 

131 Concerning titles of bibliographic resources ('venue' and 'title' columns),  

132 every word in the title is capitalized except for those that have capitals within them  

133 (probably acronyms, e.g. 'FaBiO and CiTO'). This exception, however, does not include entirely capitalized titles.  

134 Finally, null characters and spaces are removed. 

135 

136 :returns: str -- The cleaned title 

137 ''' 

138 title = self.string 

139 if title.isupper(): 

140 title = title.lower() 

141 words = title.split() 

142 for i, w in enumerate(words): 

143 if not any(x.isupper() for x in w): 

144 words[i] = w.title() 

145 new_title = ' '.join(words) 

146 return new_title 

147 

148 def __date_parse_hack(self, date:str) -> datetime: 

149 dt = parse(date, default=datetime(2001, 1, 1)) 

150 dt2 = parse(date, default=datetime(2002, 2, 2)) 

151 

152 if dt.year == dt2.year and dt.month == dt2.month and dt.day == dt2.day: 

153 clean_date = parse(date).strftime('%Y-%m-%d') 

154 elif dt.year == dt2.year and dt.month == dt2.month: 

155 clean_date = parse(date).strftime('%Y-%m') 

156 elif dt.year == dt2.year: 

157 clean_date = parse(date).strftime('%Y') 

158 else: 

159 clean_date = '' 

160 return clean_date 

161 

162 def clean_date(self) -> str: 

163 ''' 

164 It tries to parse a date-string into a datetime object,  

165 considering both the validity of the format (YYYYY-MM-DD) and the value (e.g. 30 February is not a valid date).  

166 For example, a date 2020-02-30 will become 2020-02, because the day is invalid.  

167 On the other hand, 2020-27-12 will become 2020 since the day 

168 and month are invalid.  

169 If the year is not valid (e.g.year >9999) data would be totally discarded. 

170 

171 :returns: str -- The cleaned date or an empty string 

172 ''' 

173 date = self.string 

174 try: 

175 date = self.__date_parse_hack(date) 

176 except ValueError: 

177 try: 

178 # e.g. 2021-12-17 

179 if len(date) == 10: 

180 try: 

181 # Maybe only the day is invalid, try year-month 

182 new_date = date[:-3] 

183 date = self.__date_parse_hack(new_date) 

184 except ValueError: 

185 try: 

186 # Maybe only the month is invalid, try year 

187 new_date = date[:-6] 

188 date = self.__date_parse_hack(new_date) 

189 except ValueError: 

190 date = '' 

191 # e.g. 2021-12 

192 elif len(date) == 7: 

193 # Maybe only the month is invalid, try year 

194 try: 

195 new_date = date[:-3] 

196 date = self.__date_parse_hack(new_date) 

197 except ValueError: 

198 date = '' 

199 else: 

200 date = '' 

201 except ValueError: 

202 date = '' 

203 return date 

204 

205 def clean_name(self) -> str: 

206 ''' 

207 The first letter of each element of the name is capitalized and superfluous spaces are removed. 

208 

209 :returns: str -- The cleaned name 

210 ''' 

211 name = self.string 

212 if ',' in name: 

213 split_name = re.split(comma_and_spaces, name) 

214 first_name = split_name[1].split() 

215 for i, w in enumerate(first_name): 

216 first_name[i] = Cleaner(w).clean_title() 

217 new_first_name = ' '.join(first_name) 

218 surname = split_name[0].split() 

219 for i, w in enumerate(surname): 

220 surname[i] = Cleaner(w).clean_title() 

221 new_surname = ' '.join(surname) 

222 if new_surname: 

223 new_name = new_surname + ', ' + new_first_name 

224 else: 

225 new_name = '' 

226 else: 

227 split_name = name.split() 

228 for i, w in enumerate(split_name): 

229 split_name[i] = Cleaner(w).clean_title() 

230 new_name = ' '.join(split_name) 

231 return new_name 

232 

233 def remove_unwanted_characters(self) -> str: 

234 ''' 

235 This method helps remove unwanted characters from authors' names.  

236 Such characters are all characters other than letters, numbers, space, '&', apostroph, or dots that are not preceded by letters.  

237 Numbers and '&' are significant if the author is an organization and not a person. 

238 Finally, hyphens are normalized, Unicode encodings decoded, and extra spaces removed. 

239 

240 :returns: str -- The cleaned name 

241 ''' 

242 unwanted_characters = {'[', ']', '{', '}', '(', ')', '?', ';', ','} 

243 clean_string = str() 

244 for i, c in enumerate(self.string): 

245 if c == '.': 

246 if self.string[i-1].isalpha(): 

247 clean_string += c 

248 elif c not in unwanted_characters: 

249 clean_string += c 

250 clean_string = ' '.join(clean_string.split()).strip() 

251 clean_string = html.unescape(clean_string) 

252 clean_string = Cleaner(clean_string).normalize_hyphens() 

253 return clean_string 

254 

255 @staticmethod 

256 def clean_ra_list(ra_list:list) -> list: 

257 ''' 

258 This method removes responsible agents reported as 'Not Available'. 

259 

260 :returns: list -- The cleaned responsible agents' list 

261 ''' 

262 new_ra_list = list() 

263 for ra in ra_list: 

264 if ',' in ra: 

265 split_name = re.split(comma_and_spaces, ra) 

266 first_name = split_name[1] if split_name[1].lower() != 'not available' else '' 

267 given_name = split_name[0] if split_name[0].lower() != 'not available' else '' 

268 if given_name: 

269 if first_name: 

270 new_ra_list.append(ra) 

271 else: 

272 new_ra_list.append(f'{given_name}, ') 

273 else: 

274 continue 

275 else: 

276 if ra.lower() != 'not available': 

277 new_ra_list.append(ra) 

278 return new_ra_list 

279 

280 

281 def normalize_id(self, valid_dois_cache:dict=dict()) -> Union[str, None]: 

282 ''' 

283 This function verifies and normalizes identifiers whose schema corresponds to a DOI, an ISSN, an ISBN or an ORCID. 

284 

285 :returns: Union[str, None] -- The normalized identifier if it is valid, None otherwise 

286 ''' 

287 identifier = self.string.split(':', 1) 

288 schema = identifier[0].lower() 

289 value = identifier[1] 

290 use_api_service = True if valid_dois_cache else False 

291 validator = 'is_valid' if use_api_service else 'check_digit' 

292 if schema == 'doi': 

293 doi_manager = DOIManager(data=valid_dois_cache, use_api_service=use_api_service) 

294 valid_id = doi_manager.normalise(value, include_prefix=True) if getattr(doi_manager, validator)(value) else None 

295 elif schema == 'isbn': 

296 isbn_manager = ISBNManager() 

297 valid_id = isbn_manager.normalise(value, include_prefix=True) if getattr(isbn_manager, validator)(value) else None 

298 elif schema == 'issn': 

299 if value == '0000-0000': 

300 valid_id = None 

301 else: 

302 issn_manager = ISSNManager() 

303 valid_id = issn_manager.normalise(value, include_prefix=True) if getattr(issn_manager, validator)(value) else None 

304 elif schema == 'orcid': 

305 orcid_manager = ORCIDManager() 

306 valid_id = orcid_manager.normalise(value, include_prefix=True) if getattr(orcid_manager, validator)(value) else None 

307 else: 

308 valid_id = f'{schema}:{value}' 

309 return valid_id 

310 

311 @classmethod 

312 def clean_volume_and_issue(cls, row:dict) -> None: 

313 output = {'volume': '', 'issue': '', 'pub_date': ''} 

314 for field in {'volume', 'issue'}: 

315 vi = row[field] 

316 vi = Cleaner(vi).normalize_hyphens() 

317 vi = Cleaner(vi).normalize_spaces().strip() 

318 vi = html.unescape(vi) 

319 for pattern, strategy in invalid_vi_patterns.items(): 

320 pattern = f'^{pattern}$' 

321 capturing_groups = re.search(pattern, vi, re.IGNORECASE) 

322 if capturing_groups: 

323 if strategy == 'del': 

324 row[field] = '' 

325 elif strategy == 'do_nothing': 

326 row[field] = vi 

327 elif strategy == 's)': 

328 row[field] = f'{vi}s)' 

329 else: 

330 row[field] = '' 

331 whatever, volume, issue, pub_date = cls.fix_invalid_vi(capturing_groups, strategy) 

332 row[field] = whatever if whatever else row[field] 

333 output['volume'] = volume if volume else '' 

334 output['issue'] = issue if issue else '' 

335 output['pub_date'] = pub_date if pub_date else '' 

336 row['volume'] = output['volume'] if not row['volume'] else row['volume'] 

337 row['issue'] = output['issue'] if not row['issue'] else row['issue'] 

338 row['pub_date'] = output['pub_date'] if not row['pub_date'] else row['pub_date'] 

339 switch_vi = {'volume': '', 'issue': ''} 

340 for field in {'volume', 'issue'}: 

341 vi = row[field] 

342 for pattern in volumes_valid_patterns: 

343 pattern = f'^{pattern}$' 

344 if re.search(pattern, vi, re.IGNORECASE): 

345 if field == 'issue': 

346 switch_vi['volume'] = vi 

347 for pattern in issues_valid_patterns: 

348 pattern = f'^{pattern}$' 

349 if re.search(pattern, vi, re.IGNORECASE): 

350 if field == 'volume': 

351 switch_vi['issue'] = vi 

352 if switch_vi['volume'] and switch_vi['issue']: 

353 row['volume'] = switch_vi['volume'] 

354 row['issue'] = switch_vi['issue'] 

355 elif switch_vi['volume'] and not row['volume']: 

356 row['volume'] = switch_vi['volume'] 

357 row['issue'] = '' 

358 row['type'] = 'journal volume' if row['type'] == 'journal issue' else row['type'] 

359 elif switch_vi['issue'] and not row['issue']: 

360 row['issue'] = switch_vi['issue'] 

361 row['volume'] = '' 

362 row['type'] = 'journal issue' if row['type'] == 'journal volume' else row['type'] 

363 

364 @staticmethod 

365 def fix_invalid_vi(capturing_groups:re.Match, strategy:str) -> Tuple[str, str, str, str]: 

366 vol_group = 1 if 'vol_iss' in strategy else 2 

367 iss_group = 1 if 'iss_vol' in strategy else 2 

368 whatever = None 

369 volume = None 

370 issue = None 

371 pub_date = None 

372 if 'vol' in strategy and 'iss' in strategy: 

373 volume = capturing_groups.group(vol_group) 

374 issue = capturing_groups.group(iss_group) 

375 if 'year' in strategy: 

376 pub_date = capturing_groups.group(3) 

377 elif strategy == 'all': 

378 whatever = capturing_groups.group(1) 

379 elif strategy == 'sep': 

380 first = capturing_groups.group(1) 

381 second = capturing_groups.group(2) 

382 whatever = f'{first}-{second}' 

383 return whatever, volume, issue, pub_date 

384 

385 def remove_ascii(self): 

386 unwanted_chars = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff' 

387 clean_string = ''.join([' ' if c in unwanted_chars else c for c in self.string]) 

388 clean_string = ' '.join(clean_string.split()) 

389 return clean_string