Coverage for oc_meta / lib / cleaner.py: 96%

239 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1# SPDX-FileCopyrightText: 2019 Silvio Peroni <silvio.peroni@unibo.it> 

2# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com> 

3# SPDX-FileCopyrightText: 2021 Simone Persiani <iosonopersia@gmail.com> 

4# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8import html 

9import re 

10from collections import OrderedDict 

11from datetime import datetime 

12from typing import Tuple, Union 

13 

14from dateutil.parser import parse 

15from oc_ds_converter.oc_idmanager import (DOIManager, ISBNManager, ISSNManager, 

16 ORCIDManager) 

17 

18from oc_meta.lib.master_of_regex import ( 

19 RE_COMMA_AND_SPACES, 

20 RE_INVALID_VI_PATTERNS, 

21 RE_ISSUES_VALID_PATTERNS, 

22 RE_VOLUMES_VALID_PATTERNS, 

23 split_name_and_ids, 

24) 

25 

26_HYPHEN_TRANS = str.maketrans({ 

27 '\u00AD': '\u002D', # Soft hyphen 

28 '\u06D4': '\u002D', # Arabic Full Stop 

29 '\u2010': '\u002D', # Hyphen 

30 '\u2011': '\u002D', # Non-breaking Hyphen 

31 '\u2012': '\u002D', # Figure Dash 

32 '\u2013': '\u002D', # En-Dash 

33 '\u2014': '\u002D', # Em-Dash 

34 '\u2043': '\u002D', # Hyphen Bullet 

35 '\u2212': '\u002D', # Minus Sign 

36 '\u2796': '\u002D', # Heavy Minus Sign 

37 '\u2CBA': '\u002D', # Coptic Capital Letter Dialect-p Ni 

38 '\uFE58': '\u002D', # Small Em Dash 

39}) 

40 

41_SPACE_TRANS = str.maketrans({ 

42 '\u0009': '\u0020', # Character Tabulation 

43 '\u00A0': '\u0020', # No-break space 

44 '\u200B': '\u0020', # Zero width space 

45 '\u202F': '\u0020', # Narrow no-break space 

46 '\u2003': '\u0020', # Em Space 

47 '\u2005': '\u0020', # Four-Per-Em Space 

48 '\u2009': '\u0020', # Thin Space 

49}) 

50 

51# Translation table for control characters and extended ASCII to space 

52# Covers: 0x00-0x1F (control chars), 0x7F (DEL), 0x80-0xFF (extended ASCII) 

53_ASCII_CONTROL_TRANS = str.maketrans( 

54 {chr(i): ' ' for i in range(0x00, 0x20)} 

55 | {chr(0x7F): ' '} 

56 | {chr(i): ' ' for i in range(0x80, 0x100)} 

57) 

58 

59_DOI_MANAGER = DOIManager(use_api_service=False, storage_manager=None) 

60_ISBN_MANAGER = ISBNManager() 

61_ISSN_MANAGER = ISSNManager() 

62_ORCID_MANAGER = ORCIDManager(use_api_service=False, storage_manager=None) 

63 

64 

65def normalize_hyphens(string: str) -> str: 

66 ''' 

67 It replaces any hyphen, dash and minus sign with a hyphen-minus character. 

68 This is done for pages, IDs and dates. 

69 

70 .. list-table:: Comparison between the various characters similar to hyphen-minus 

71 :widths: 25 25 50 

72 :header-rows: 1 

73 

74 * - UTF-8 

75 - SIGN 

76 - NAME 

77 * - U+002D 

78 - - 

79 - Hyphen-minus 

80 * - U+00AD 

81 - ­ 

82 - Soft hyphen 

83 * - U+06D4 

84 - ۔ 

85 - Arabic Full Stop 

86 * - U+2010 

87 - ‐ 

88 - Hyphen 

89 * - U+2011 

90 - − 

91 - Non-breaking Hyphen 

92 * - U+2012 

93 - – 

94 - Figure Dash 

95 * - U+2013 

96 - – 

97 - En-Dash 

98 * - U+2014 

99 - — 

100 - Em-Dash 

101 * - U+2043 

102 - ⁃ 

103 - Hyphen Bullet 

104 * - U+2212 

105 - − 

106 - Minus Sign 

107 * - U+2796 

108 - ➖ 

109 - Heavy Minus Sign 

110 * - U+2CBA 

111 - Ⲻ 

112 - Coptic Capital Letter Dialect-p Ni 

113 * - U+FE58 

114 - ﹘ 

115 - Small Em Dash 

116 

117 :returns: str -- the string with normalized hyphens 

118 ''' 

119 return string.translate(_HYPHEN_TRANS) 

120 

121 

122def normalize_spaces(string: str) -> str: 

123 ''' 

124 It replaces any ambiguous spaces with a space. 

125 

126 .. list-table:: List of the various characters similar to the space 

127 :widths: 25 25 50 

128 :header-rows: 1 

129 

130 * - UTF-8 

131 - NAME 

132 * - U+0020 

133 - Space 

134 * - U+0009 

135 - Character Tabulation 

136 * - U+00A0 

137 - No-break space 

138 * - U+200B 

139 - Zero width space 

140 * - U+202F 

141 - Narrow no-break space 

142 * - U+2003 

143 - Em Space 

144 * - U+2005 

145 - Four-Per-Em Space 

146 * - U+2009 

147 - Thin Space 

148 

149 :returns: str -- the string with normalized spaces 

150 ''' 

151 return string.translate(_SPACE_TRANS).replace('&nbsp;', '\u0020') 

152 

153 

154def clean_title(string: str, normalize: bool = True) -> str: 

155 ''' 

156 Concerning titles of bibliographic resources ('venue' and 'title' columns), 

157 every word in the title is capitalized except for those that have capitals within them 

158 (probably acronyms, e.g. 'FaBiO and CiTO'). This exception, however, does not include entirely capitalized titles. 

159 Finally, null characters and spaces are removed. 

160 

161 :returns: str -- The cleaned title 

162 ''' 

163 title = string 

164 if normalize: 

165 if title.isupper(): 

166 title = title.lower() 

167 words = title.split() 

168 for i, w in enumerate(words): 

169 if not any(x.isupper() for x in w): 

170 words[i] = w.title() 

171 return ' '.join(words) 

172 return title 

173 

174 

175def _date_parse_hack(date: str) -> str: 

176 dt = parse(date, default=datetime(2001, 1, 1)) 

177 dt2 = parse(date, default=datetime(2002, 2, 2)) 

178 

179 if dt.year == dt2.year and dt.month == dt2.month and dt.day == dt2.day: 

180 clean_date = parse(date).strftime('%Y-%m-%d') 

181 elif dt.year == dt2.year and dt.month == dt2.month: 

182 clean_date = parse(date).strftime('%Y-%m') 

183 elif dt.year == dt2.year: 

184 clean_date = parse(date).strftime('%Y') 

185 else: 

186 clean_date = '' 

187 return clean_date 

188 

189 

190def clean_date(string: str) -> str: 

191 ''' 

192 It tries to parse a date-string into a datetime object, 

193 considering both the validity of the format (YYYYY-MM-DD) and the value (e.g. 30 February is not a valid date). 

194 For example, a date 2020-02-30 will become 2020-02, because the day is invalid. 

195 On the other hand, 2020-27-12 will become 2020 since the day 

196 and month are invalid. 

197 If the year is not valid (e.g.year >9999) data would be totally discarded. 

198 

199 :returns: str -- The cleaned date or an empty string 

200 ''' 

201 date = string 

202 try: 

203 date = _date_parse_hack(date) 

204 except ValueError: 

205 try: 

206 # e.g. 2021-12-17 

207 if len(date) == 10: 

208 try: 

209 # Maybe only the day is invalid, try year-month 

210 new_date = date[:-3] 

211 date = _date_parse_hack(new_date) 

212 except ValueError: 

213 try: 

214 # Maybe only the month is invalid, try year 

215 new_date = date[:-6] 

216 date = _date_parse_hack(new_date) 

217 except ValueError: 

218 date = '' 

219 # e.g. 2021-12 

220 elif len(date) == 7: 

221 # Maybe only the month is invalid, try year 

222 try: 

223 new_date = date[:-3] 

224 date = _date_parse_hack(new_date) 

225 except ValueError: 

226 date = '' 

227 else: 

228 date = '' 

229 except ValueError: 

230 date = '' 

231 return date 

232 

233 

234def clean_name(string: str) -> str: 

235 ''' 

236 The first letter of each element of the name is capitalized and superfluous spaces are removed. 

237 

238 :returns: str -- The cleaned name 

239 ''' 

240 name = string 

241 if ',' in name: 

242 split_name = RE_COMMA_AND_SPACES.split(name) 

243 first_name = split_name[1].split() 

244 for i, w in enumerate(first_name): 

245 first_name[i] = clean_title(w) 

246 new_first_name = ' '.join(first_name) 

247 surname = split_name[0].split() 

248 for i, w in enumerate(surname): 

249 surname[i] = clean_title(w) 

250 new_surname = ' '.join(surname) 

251 if new_surname: 

252 return new_surname + ', ' + new_first_name 

253 return '' 

254 split_name = name.split() 

255 for i, w in enumerate(split_name): 

256 split_name[i] = clean_title(w) 

257 return ' '.join(split_name) 

258 

259 

260def clean_agent_name(string: str) -> str: 

261 ''' 

262 Clean a responsible agent name (author, editor, publisher). 

263 

264 Removes unwanted characters while preserving letters, numbers, spaces, 

265 '&', apostrophes, and dots preceded by letters. Numbers and '&' are 

266 kept for organization names (e.g., "3M", "Smith & Sons"). 

267 Normalizes hyphens, decodes HTML entities, and removes extra spaces. 

268 

269 :returns: str -- The cleaned agent name. 

270 ''' 

271 unwanted_characters = {'[', ']', ';', '?'} 

272 chars = [] 

273 for i, c in enumerate(string): 

274 if c == '.': 

275 if i > 0 and string[i-1].isalpha(): 

276 chars.append(c) 

277 elif c not in unwanted_characters: 

278 chars.append(c) 

279 clean_string = ' '.join(''.join(chars).split()) 

280 clean_string = html.unescape(clean_string) 

281 clean_string = clean_string.translate(_HYPHEN_TRANS) 

282 return clean_string 

283 

284 

285def _normalize_ra_name(raw_name: str) -> str: 

286 """Normalize a RA name into one of: '', 'Full Name', 'Last, First', 'Last, '. 

287 

288 Returns '' when the name is absent, literally 'Not Available', or a 

289 comma-separated pair whose surname is missing. Bare names are run 

290 through :func:`clean_agent_name` to drop bracket / punctuation junk. 

291 """ 

292 name = raw_name.strip() 

293 if not name: 

294 return '' 

295 if ',' in name: 

296 last, _, first = name.partition(',') 

297 last = last.strip() 

298 first = first.strip() 

299 if last.lower() == 'not available': 

300 last = '' 

301 if first.lower() == 'not available': 

302 first = '' 

303 if not last: 

304 return '' 

305 return f'{last}, {first}' if first else f'{last}, ' 

306 cleaned = clean_agent_name(name) 

307 if cleaned.lower() == 'not available': 

308 return '' 

309 return cleaned 

310 

311 

312def clean_ra_list(ra_list: list) -> list: 

313 ''' 

314 Clean a list of responsible agents: normalize names, drop 'Not Available' 

315 entries, remove duplicates that share a name and at least one id, and 

316 strip identifiers that appear under more than one agent. 

317 

318 :returns: list -- The cleaned responsible agents' list 

319 ''' 

320 

321 # Phase 1: parse each entry into (key, name, ids). The key groups entries 

322 # that belong to the same id bucket: named entries by their normalized 

323 # name, nameless (ids-only) entries by the raw input so each stays 

324 # distinct. 

325 parsed: list[tuple[str, str, list[str]]] = [] 

326 agents_ids: OrderedDict[str, OrderedDict[str, None]] = OrderedDict() 

327 for ra in ra_list: 

328 raw_name, ids_str = split_name_and_ids(ra) 

329 name = _normalize_ra_name(raw_name) 

330 ids = ids_str.split() 

331 if not name and not ids: 

332 continue 

333 key = name or ra 

334 parsed.append((key, name, ids)) 

335 if ids: 

336 agents_ids.setdefault(key, OrderedDict()).update( 

337 OrderedDict.fromkeys(ids) 

338 ) 

339 

340 # Phase 2: identifiers bucketed under more than one key are shared and 

341 # must be dropped — they cannot unambiguously identify a single agent. 

342 id_occurrences: dict[str, int] = {} 

343 for bucket in agents_ids.values(): 

344 for identifier in bucket: 

345 id_occurrences[identifier] = id_occurrences.get(identifier, 0) + 1 

346 shared_ids = {i for i, count in id_occurrences.items() if count > 1} 

347 

348 # Phase 3: emit cleaned entries in input order, dropping later duplicates 

349 # that share at least one surviving id with a previous entry of the same 

350 # name. 

351 output: list[str] = [] 

352 seen_ids_by_name: OrderedDict[str, set[str]] = OrderedDict() 

353 for _, name, ids in parsed: 

354 kept_ids = [i for i in ids if i not in shared_ids] 

355 kept_ids_str = ' '.join(kept_ids) 

356 if not name: 

357 output.append(f'[{kept_ids_str}]') 

358 continue 

359 kept_set = set(kept_ids) 

360 if name in seen_ids_by_name and seen_ids_by_name[name] & kept_set: 

361 continue 

362 seen_ids_by_name.setdefault(name, set()).update(kept_set) 

363 output.append(f'{name} [{kept_ids_str}]' if kept_ids else name) 

364 return output 

365 

366 

367def normalize_id(string: str) -> Union[str, None]: 

368 ''' 

369 This function verifies and normalizes identifiers whose schema corresponds to a DOI, an ISSN, an ISBN or an ORCID. 

370 

371 :returns: Union[str, None] -- The normalized identifier if it is valid, None otherwise 

372 ''' 

373 identifier = string.split(':', 1) 

374 schema = identifier[0].lower() 

375 value = identifier[1] 

376 if schema == 'doi': 

377 valid_id = _DOI_MANAGER.normalise(value, include_prefix=True) if _DOI_MANAGER.syntax_ok(value) else None 

378 elif schema == 'isbn': 

379 valid_id = _ISBN_MANAGER.normalise(value, include_prefix=True) if _ISBN_MANAGER.is_valid(value, get_extra_info=False) else None 

380 elif schema == 'issn': 

381 if value == '0000-0000': 

382 valid_id = None 

383 else: 

384 try: 

385 valid_id = _ISSN_MANAGER.normalise(value, include_prefix=True) if _ISSN_MANAGER.is_valid(value, get_extra_info=False) else None 

386 except ValueError: 

387 print(value) 

388 raise(ValueError) 

389 elif schema == 'orcid': 

390 valid_id = _ORCID_MANAGER.normalise(value, include_prefix=True) if _ORCID_MANAGER.is_valid(value, get_extra_info=False) else None 

391 else: 

392 valid_id = f'{schema}:{value}' 

393 return valid_id 

394 

395 

396def clean_volume_and_issue(row: dict) -> None: 

397 output = {'volume': '', 'issue': '', 'pub_date': ''} 

398 for field in {'volume', 'issue'}: 

399 vi = row[field] 

400 vi = normalize_hyphens(vi) 

401 vi = normalize_spaces(vi).strip() 

402 vi = html.unescape(vi) 

403 for compiled_pattern, strategy in RE_INVALID_VI_PATTERNS.items(): 

404 capturing_groups = compiled_pattern.search(vi) 

405 if capturing_groups: 

406 if strategy == 'del': 

407 row[field] = '' 

408 elif strategy == 'do_nothing': 

409 row[field] = vi 

410 elif strategy == 's)': 

411 row[field] = f'{vi}s)' 

412 else: 

413 row[field] = '' 

414 whatever, volume, issue, pub_date = _fix_invalid_vi(capturing_groups, strategy) 

415 row[field] = whatever if whatever else row[field] 

416 output['volume'] = volume if volume else '' 

417 output['issue'] = issue if issue else '' 

418 output['pub_date'] = pub_date if pub_date else '' 

419 row['volume'] = output['volume'] if not row['volume'] else row['volume'] 

420 row['issue'] = output['issue'] if not row['issue'] else row['issue'] 

421 row['pub_date'] = output['pub_date'] if not row['pub_date'] else row['pub_date'] 

422 switch_vi = {'volume': '', 'issue': ''} 

423 for field in {'volume', 'issue'}: 

424 vi = row[field] 

425 for compiled_pattern in RE_VOLUMES_VALID_PATTERNS: 

426 if compiled_pattern.search(vi): 

427 if field == 'issue': 

428 switch_vi['volume'] = vi 

429 for compiled_pattern in RE_ISSUES_VALID_PATTERNS: 

430 if compiled_pattern.search(vi): 

431 if field == 'volume': 

432 switch_vi['issue'] = vi 

433 if switch_vi['volume'] and switch_vi['issue']: 

434 row['volume'] = switch_vi['volume'] 

435 row['issue'] = switch_vi['issue'] 

436 elif switch_vi['volume'] and not row['volume']: 

437 row['volume'] = switch_vi['volume'] 

438 row['issue'] = '' 

439 row['type'] = 'journal volume' if row['type'] == 'journal issue' else row['type'] 

440 elif switch_vi['issue'] and not row['issue']: 

441 row['issue'] = switch_vi['issue'] 

442 row['volume'] = '' 

443 row['type'] = 'journal issue' if row['type'] == 'journal volume' else row['type'] 

444 

445 

446def _fix_invalid_vi(capturing_groups: re.Match, strategy: str) -> Tuple[str | None, str | None, str | None, str | None]: 

447 vol_group = 1 if 'vol_iss' in strategy else 2 

448 iss_group = 1 if 'iss_vol' in strategy else 2 

449 whatever = None 

450 volume = None 

451 issue = None 

452 pub_date = None 

453 if 'vol' in strategy and 'iss' in strategy: 

454 volume = capturing_groups.group(vol_group) 

455 issue = capturing_groups.group(iss_group) 

456 if 'year' in strategy: 

457 pub_date = capturing_groups.group(3) 

458 elif strategy == 'all': 

459 whatever = capturing_groups.group(1) 

460 elif strategy == 'sep': 

461 first = capturing_groups.group(1) 

462 second = capturing_groups.group(2) 

463 whatever = f'{first}-{second}' 

464 return whatever, volume, issue, pub_date 

465 

466 

467def remove_ascii(string: str) -> str: 

468 clean_string = string.translate(_ASCII_CONTROL_TRANS) 

469 return ' '.join(clean_string.split())