Coverage for oc_ds_converter / crossref / crossref_processing.py: 85%

312 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-06-12 21:23 +0000

1# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com> 

2# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it> 

4# SPDX-FileCopyrightText: 2023-2025 Arianna Moretti <arianna.moretti4@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8from __future__ import annotations 

9 

10import re 

11from collections import defaultdict 

12from typing import List, Optional, Tuple 

13 

14from oc_ds_converter.lib.cleaner import Cleaner 

15from oc_ds_converter.lib.crossref_style_processing import CrossrefStyleProcessing 

16from oc_ds_converter.lib.master_of_regex import ids_inside_square_brackets, pages_separator 

17from oc_ds_converter.ra_processor import families_match 

18from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

19 

20 

21class CrossrefProcessing(CrossrefStyleProcessing): 

22 

23 def __init__( 

24 self, 

25 orcid_index: str | None = None, 

26 publishers_filepath: str | None = None, 

27 storage_manager: StorageManager | None = None, 

28 testing: bool = True, 

29 citing: bool = True, 

30 use_orcid_api: bool = True, 

31 use_redis_orcid_index: bool = False, 

32 use_redis_publishers: bool = False, 

33 exclude_existing: bool = False, 

34 ): 

35 super().__init__( 

36 orcid_index=orcid_index, 

37 publishers_filepath=publishers_filepath, 

38 storage_manager=storage_manager, 

39 testing=testing, 

40 citing=citing, 

41 use_redis_orcid_index=use_redis_orcid_index, 

42 use_orcid_api=use_orcid_api, 

43 use_redis_publishers=use_redis_publishers, 

44 ) 

45 self.exclude_existing = exclude_existing 

46 

47 def _extract_doi(self, item: dict) -> str: 

48 doi = item.get('DOI', '') 

49 if isinstance(doi, list): 

50 doi = doi[0] if doi else '' 

51 return str(doi) 

52 

53 def _extract_title(self, item: dict) -> str: 

54 title = item.get('title') 

55 if not title: 

56 return '' 

57 if isinstance(title, list): 

58 title = title[0] if title else '' 

59 return self.clean_markup(str(title)) 

60 

61 def _extract_agents(self, item: dict) -> list[dict]: 

62 agents_list: list[dict] = [] 

63 if 'author' in item: 

64 for author in item['author']: 

65 agents_list.append({**author, 'role': 'author'}) 

66 if 'editor' in item: 

67 for editor in item['editor']: 

68 agents_list.append({**editor, 'role': 'editor'}) 

69 return agents_list 

70 

71 def _extract_venue(self, item: dict) -> str: 

72 item_type = self._extract_type(item) 

73 return self.get_venue_name(item, {'type': item_type}) 

74 

75 def _extract_pub_date(self, item: dict) -> str: 

76 if 'issued' not in item: 

77 return '' 

78 date_parts = item['issued'].get('date-parts', [[]]) 

79 if date_parts and date_parts[0] and date_parts[0][0]: 

80 return '-'.join([str(y) for y in date_parts[0]]) 

81 return '' 

82 

83 def _extract_pages(self, item: dict) -> str: 

84 if 'page' not in item: 

85 return '' 

86 pages_list = re.split(pages_separator, item['page']) 

87 return self.get_pages(pages_list) 

88 

89 def _extract_type(self, item: dict) -> str: 

90 item_type = item.get('type', '') 

91 if item_type: 

92 return item_type.replace('-', ' ') 

93 return '' 

94 

95 def _extract_publisher(self, item: dict) -> str: 

96 doi = self._extract_doi(item) 

97 if not doi: 

98 return '' 

99 norm_doi = self.doi_m.normalise(doi, include_prefix=False) 

100 if not norm_doi: 

101 return '' 

102 return self.get_publisher_name(norm_doi, item) 

103 

104 def csv_creator(self, item: dict) -> dict: 

105 doi = self._extract_doi(item) 

106 if not doi: 

107 return {} 

108 

109 norm_doi = self.doi_m.normalise(doi, include_prefix=False) 

110 if not norm_doi: 

111 return {} 

112 

113 item_type = self._extract_type(item) 

114 

115 # Build ID (DOI + optional ISBN/ISSN based on type) 

116 ids_list = [f'doi:{norm_doi}'] 

117 if 'ISBN' in item: 

118 if item_type in {'book', 'dissertation', 'edited book', 'monograph', 'reference book', 'report', 'standard'}: 

119 self.id_worker(item['ISBN'], ids_list, self.isbn_worker) 

120 if 'ISSN' in item: 

121 if item_type in {'book series', 'book set', 'journal', 'proceedings series', 'series', 'standard series'}: 

122 self.id_worker(item['ISSN'], ids_list, self.issn_worker) 

123 elif item_type == 'report series': 

124 if not item.get('container-title'): 

125 self.id_worker(item['ISSN'], ids_list, self.issn_worker) 

126 

127 agents_list = self._extract_agents(item) 

128 authors_strings_list, editors_string_list = self.get_agents_strings_list(norm_doi, agents_list) 

129 

130 row = { 

131 'id': ' '.join(ids_list), 

132 'title': self._extract_title(item), 

133 'author': '; '.join(authors_strings_list), 

134 'issue': self._extract_issue(item), 

135 'volume': self._extract_volume(item), 

136 'venue': self._extract_venue(item), 

137 'pub_date': self._extract_pub_date(item), 

138 'page': self._extract_pages(item), 

139 'type': item_type, 

140 'publisher': self._extract_publisher(item), 

141 'editor': '; '.join(editors_string_list) 

142 } 

143 return self.normalise_unicode(row) 

144 

145 def get_crossref_pages(self, item:dict) -> str: 

146 pages_list = re.split(pages_separator, item['page']) 

147 return self.get_pages(pages_list) 

148 

149 def get_publisher_name(self, doi: str, item: dict) -> str: 

150 publisher = item.get('publisher', '') 

151 member = item.get('member') 

152 prefix = item.get('prefix') or doi.split('/')[0] 

153 

154 if member: 

155 name = self._get_publisher_name_by_member(str(member)) 

156 if name: 

157 return f'{name} [crossref:{member}]' 

158 

159 result = self.get_publisher_by_prefix(prefix) 

160 if result: 

161 name, member_id = result 

162 return f'{name} [crossref:{member_id}]' 

163 

164 return f'{publisher} [crossref:{member}]' if member else publisher 

165 

166 def _get_publisher_name_by_member(self, member: str) -> str | None: 

167 if self.use_redis_publishers and self._publishers_redis: 

168 pub_data = self._publishers_redis.get_by_member(member) 

169 if pub_data: 

170 return str(pub_data['name']) 

171 return None 

172 if self.publishers_mapping and member in self.publishers_mapping: 

173 return str(self.publishers_mapping[member]['name']) 

174 return None 

175 

176 

177 def get_venue_name(self, item:dict, row:dict) -> str: 

178 name_and_id = '' 

179 if 'container-title' in item: 

180 if item['container-title']: 

181 if isinstance(item['container-title'], list): 

182 ventit = str(item['container-title'][0]) 

183 else: 

184 ventit = str(item['container-title']) 

185 ventit = self.clean_markup(ventit) 

186 ambiguous_brackets = re.search(ids_inside_square_brackets, ventit) 

187 if ambiguous_brackets: 

188 match = ambiguous_brackets.group(1) 

189 open_bracket = ventit.find(match) - 1 

190 close_bracket = ventit.find(match) + len(match) 

191 ventit = ventit[:open_bracket] + '(' + ventit[open_bracket + 1:] 

192 ventit = ventit[:close_bracket] + ')' + ventit[close_bracket + 1:] 

193 venids_list = list() 

194 if 'ISBN' in item: 

195 if row['type'] in {'book chapter', 'book part', 'book section', 'book track', 'reference entry'}: 

196 self.id_worker(item['ISBN'], venids_list, self.isbn_worker) 

197 

198 if 'ISSN' in item: 

199 if row['type'] in {'book', 'data file', 'dataset', 'edited book', 'journal article', 'journal volume', 'journal issue', 'monograph', 'proceedings', 'peer review', 'reference book', 'reference entry', 'report'}: 

200 self.id_worker(item['ISSN'], venids_list, self.issn_worker) 

201 elif row['type'] == 'report series': 

202 if 'container-title' in item: 

203 if item['container-title']: 

204 self.id_worker(item['ISSN'], venids_list, self.issn_worker) 

205 if venids_list: 

206 name_and_id = ventit + ' [' + ' '.join(venids_list) + ']' 

207 else: 

208 name_and_id = ventit 

209 return name_and_id 

210 

211 def extract_all_ids( 

212 self, entity_dict: dict, is_citing: bool 

213 ) -> tuple[list[str], list[str]]: 

214 all_br: set[str] = set() 

215 all_ra: set[str] = set() 

216 

217 if is_citing: 

218 # VALIDATE RESPONSIBLE AGENTS IDS FOR THE CITING ENTITY 

219 if entity_dict.get("author"): 

220 for author in entity_dict["author"]: 

221 if "ORCID" in author: 

222 orcid = self.orcid_m.normalise(author["ORCID"], include_prefix=True) 

223 if orcid: 

224 all_ra.add(orcid) 

225 

226 if entity_dict.get("editor"): 

227 for editor in entity_dict["editor"]: 

228 if "ORCID" in editor: 

229 orcid = self.orcid_m.normalise(editor["ORCID"], include_prefix=True) 

230 if orcid: 

231 all_ra.add(orcid) 

232 

233 # RETRIEVE CITED IDS OF A CITING ENTITY 

234 else: 

235 citations = [x for x in entity_dict.get("reference", []) if x.get("DOI")] 

236 for cit in citations: 

237 norm_id = self.doi_m.normalise(cit["DOI"], include_prefix=True) 

238 if norm_id: 

239 all_br.add(norm_id) 

240 

241 return list(all_br), list(all_ra) 

242 

243 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]: 

244 authors_strings_list = [] 

245 editors_string_list = [] 

246 

247 # --- 1) DOI → lookup indice --- 

248 raw_index = self.orcid_finder(doi) if doi else None 

249 

250 # --- 2) Parser indice → lista candidati (family, given, orcid normalizzato) --- 

251 def _split_name(text: str) -> Tuple[str, Optional[str]]: 

252 base = re.sub(r'\s*\[.*?\]\s*$', '', text).strip() 

253 if ',' in base: 

254 fam, giv = [p.strip() for p in base.split(',', 1)] 

255 return fam, (giv if giv else None) 

256 toks = base.split() 

257 if len(toks) >= 2: 

258 return toks[-1], ' '.join(toks[:-1]) 

259 return base, None 

260 

261 candidates: List[Tuple[str, Optional[str], str]] = [] 

262 if raw_index: 

263 for k, v in raw_index.items(): 

264 oc = k if str(k).lower().startswith("orcid:") else f"orcid:{k}" 

265 oc_norm = self.orcid_m.normalise(oc, include_prefix=True) 

266 if not oc_norm: 

267 continue 

268 fam, giv = _split_name(v) 

269 candidates.append((fam.lower(), (giv or "").lower() or None, oc_norm)) 

270 

271 # --- 3) Pulizia agenti --- 

272 agents_list = [ 

273 { 

274 k: Cleaner(v).remove_unwanted_characters() 

275 if k in {"family", "given", "name"} and v is not None else v 

276 for k, v in agent_dict.items() 

277 } 

278 for agent_dict in agents_list 

279 ] 

280 

281 def _format_person(a: dict) -> Tuple[Optional[str], Optional[str], Optional[str]]: 

282 family = a.get("family") 

283 given = a.get("given") 

284 if family and given: 

285 return family, given, f"{family}, {given}" 

286 if a.get("name"): 

287 base = a["name"] 

288 if "," in base: 

289 fam, giv = [p.strip() for p in base.split(",", 1)] 

290 return fam, (giv if giv else None), (f"{fam}, {giv}" if giv else fam) 

291 return base, None, base 

292 if family: 

293 return family, given, f"{family}, {given or ''}" 

294 if given: 

295 return None, given, f", {given}" 

296 return None, None, None 

297 

298 # --- helper locali per normalizzazione e iniziale --- 

299 def _norm(s: str) -> str: 

300 return re.sub(r"\s+", " ", (s or "").strip().lower()) 

301 

302 def _initial_from_given(given: Optional[str]) -> str: 

303 if not given: 

304 return "" 

305 first_token = re.split(r"[\s\-]+", given.strip())[0] 

306 m = re.search(r"[A-Za-z0-9]", first_token) 

307 return m.group(0).lower() if m else "" 

308 

309 # --- contatori per disambiguazione *per ruolo* (author/editor) --- 

310 name_counts = defaultdict(int) # key: (role, fam_norm, given_norm) 

311 initial_counts = defaultdict(int) # key: (role, fam_norm, initial) 

312 

313 for _a in agents_list: 

314 role_n = (_a.get("role") or "").strip().lower() 

315 fam_n = _norm(_a.get("family") or "") 

316 giv_n = _norm(_a.get("given") or "") 

317 ini_n = _initial_from_given(_a.get("given")) 

318 name_counts[(role_n, fam_n, giv_n)] += 1 

319 if fam_n and ini_n: 

320 initial_counts[(role_n, fam_n, ini_n)] += 1 

321 

322 # --- insieme di coppie (family_norm, given_norm) presenti nel batch --- 

323 name_pairs = set() 

324 for _a in agents_list: 

325 fam_n = _norm(_a.get("family") or "") 

326 giv_n = _norm(_a.get("given") or "") 

327 name_pairs.add((fam_n, giv_n)) 

328 

329 def _match_orcid(fam: Optional[str], giv: Optional[str], role: str) -> Optional[str]: 

330 if not fam: 

331 return None 

332 

333 role_n = (role or "").strip().lower() 

334 fam_n = _norm(fam) 

335 giv_n = _norm(giv) if giv else "" 

336 init = _initial_from_given(giv) 

337 

338 # filtra candidati indice per family (token-subset: gestisce i cognomi 

339 # composti senza che frammenti brevi come "li" matchino "gladilin") 

340 def fam_ok(cf: str) -> bool: 

341 return families_match(cf, fam_n) 

342 

343 cands = [(cf, cg, oc) for (cf, cg, oc) in candidates if fam_ok(cf)] 

344 if not cands: 

345 return None 

346 

347 # A) omonimi perfetti nello *stesso ruolo* → non apporre tag 

348 if name_counts.get((role_n, fam_n, giv_n), 0) > 1: 

349 return None 

350 

351 # 1) MATCH FORTE: given pieno uguale 

352 strong = [(cf, cg, oc) for (cf, cg, oc) in cands if cg and giv_n and _norm(cg) == giv_n] 

353 if len(strong) == 1: 

354 return strong[0][2] 

355 elif len(strong) > 1: 

356 orcids = {oc for (_, _, oc) in strong if oc} 

357 return orcids.pop() if len(orcids) == 1 else None 

358 

359 # --- inversion guard --- 

360 # Se esiste nel batch la coppia invertita (family=cg, given=fam) per un candidato dell'indice, 

361 # disabilita il fallback per iniziale (evita tagging del falso positivo). 

362 inversion_present = any( 

363 cg and (_norm(cg), fam_n) in name_pairs 

364 for (_, cg, _) in cands 

365 ) 

366 if inversion_present: 

367 return None 

368 

369 # 2) FALLBACK PER INIZIALE (solo se UNIVOCO *nel medesimo ruolo*) 

370 if init: 

371 if initial_counts.get((role_n, fam_n, init), 0) > 1: 

372 return None 

373 cands_init = [(cf, cg, oc) for (cf, cg, oc) in cands if _initial_from_given(cg) == init] 

374 if len(cands_init) == 1: 

375 return cands_init[0][2] 

376 elif len(cands_init) > 1: 

377 orcids = {oc for (_, _, oc) in cands_init if oc} 

378 return orcids.pop() if len(orcids) == 1 else None 

379 

380 return None 

381 

382 # --- 4) Costruzione liste --- 

383 for agent in agents_list: 

384 role = agent.get("role", "") 

385 fam, giv, display = _format_person(agent) 

386 if not display: 

387 continue 

388 

389 oc = None 

390 # 1) ORCID nei metadati 

391 for key in ("orcid", "ORCID"): 

392 if key in agent and agent[key]: 

393 raw = agent[key][0] if isinstance(agent[key], list) else agent[key] 

394 oc = self.find_crossref_orcid(raw, doi) 

395 break 

396 

397 # 2) Indice DOI→ORCID 

398 if not oc and candidates: 

399 oc = _match_orcid(fam, giv, role) 

400 

401 if oc: 

402 if not oc.startswith("orcid:"): 

403 oc = f"orcid:{oc}" 

404 display = f"{display} [{oc}]" 

405 self.tmp_orcid_m.storage_manager.set_value(oc, True) 

406 

407 if role == "author": 

408 authors_strings_list.append(display) 

409 elif role == "editor": 

410 editors_string_list.append(display) 

411 

412 return authors_strings_list, editors_string_list 

413 

414 def find_crossref_orcid(self, identifier, doi): 

415 if not isinstance(identifier, str): 

416 return "" 

417 

418 norm_orcid = self.orcid_m.normalise(identifier, include_prefix=True) 

419 if not norm_orcid: 

420 return "" 

421 

422 validity = self.validated_as({"schema": "orcid", "identifier": norm_orcid}) 

423 if validity is True: 

424 return norm_orcid 

425 if validity is False: 

426 return "" 

427 

428 norm_doi = self.doi_m.normalise(doi, include_prefix=True) if doi else None 

429 alt_doi = doi if (doi and not str(doi).lower().startswith("doi:")) else None 

430 found_orcids = set() 

431 

432 # DOI→ORCID index 

433 for candidate in (norm_doi, alt_doi): 

434 if not candidate: 

435 continue 

436 raw = self.orcid_finder(candidate) 

437 if raw: 

438 found_orcids.update(k.replace("orcid:", "").strip() for k in raw.keys()) 

439 

440 bare_orcid = norm_orcid.split(":", 1)[1] 

441 if bare_orcid in found_orcids: 

442 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True) 

443 return norm_orcid 

444 

445 # API OFF → Redis snapshot fallback 

446 if not self.use_orcid_api: 

447 if norm_orcid in self._redis_values_ra: 

448 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True) 

449 return norm_orcid 

450 return "" 

451 

452 # API ON → Redis + validazione manager 

453 norm_id_dict = {"id": norm_orcid, "schema": "orcid"} 

454 if norm_orcid in self.to_validated_id_list(norm_id_dict): 

455 return norm_orcid 

456 

457 return ""