Coverage for oc_ds_converter / crossref / crossref_processing.py: 86%

312 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com> 

2# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it> 

4# SPDX-FileCopyrightText: 2023-2025 Arianna Moretti <arianna.moretti4@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8from __future__ import annotations 

9 

10import re 

11from collections import defaultdict 

12from typing import List, Optional, Tuple 

13 

14from oc_ds_converter.lib.cleaner import Cleaner 

15from oc_ds_converter.lib.crossref_style_processing import CrossrefStyleProcessing 

16from oc_ds_converter.lib.master_of_regex import ids_inside_square_brackets, pages_separator 

17from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

18 

19 

20class CrossrefProcessing(CrossrefStyleProcessing): 

21 

22 def __init__( 

23 self, 

24 orcid_index: str | None = None, 

25 publishers_filepath: str | None = None, 

26 storage_manager: StorageManager | None = None, 

27 testing: bool = True, 

28 citing: bool = True, 

29 use_orcid_api: bool = True, 

30 use_redis_orcid_index: bool = False, 

31 use_redis_publishers: bool = False, 

32 exclude_existing: bool = False, 

33 ): 

34 super().__init__( 

35 orcid_index=orcid_index, 

36 publishers_filepath=publishers_filepath, 

37 storage_manager=storage_manager, 

38 testing=testing, 

39 citing=citing, 

40 use_redis_orcid_index=use_redis_orcid_index, 

41 use_orcid_api=use_orcid_api, 

42 use_redis_publishers=use_redis_publishers, 

43 ) 

44 self.exclude_existing = exclude_existing 

45 

46 def _extract_doi(self, item: dict) -> str: 

47 doi = item.get('DOI', '') 

48 if isinstance(doi, list): 

49 doi = doi[0] if doi else '' 

50 return str(doi) 

51 

52 def _extract_title(self, item: dict) -> str: 

53 title = item.get('title') 

54 if not title: 

55 return '' 

56 if isinstance(title, list): 

57 title = title[0] if title else '' 

58 return self.clean_markup(str(title)) 

59 

60 def _extract_agents(self, item: dict) -> list[dict]: 

61 agents_list: list[dict] = [] 

62 if 'author' in item: 

63 for author in item['author']: 

64 agents_list.append({**author, 'role': 'author'}) 

65 if 'editor' in item: 

66 for editor in item['editor']: 

67 agents_list.append({**editor, 'role': 'editor'}) 

68 return agents_list 

69 

70 def _extract_venue(self, item: dict) -> str: 

71 item_type = self._extract_type(item) 

72 return self.get_venue_name(item, {'type': item_type}) 

73 

74 def _extract_pub_date(self, item: dict) -> str: 

75 if 'issued' not in item: 

76 return '' 

77 date_parts = item['issued'].get('date-parts', [[]]) 

78 if date_parts and date_parts[0] and date_parts[0][0]: 

79 return '-'.join([str(y) for y in date_parts[0]]) 

80 return '' 

81 

82 def _extract_pages(self, item: dict) -> str: 

83 if 'page' not in item: 

84 return '' 

85 pages_list = re.split(pages_separator, item['page']) 

86 return self.get_pages(pages_list) 

87 

88 def _extract_type(self, item: dict) -> str: 

89 item_type = item.get('type', '') 

90 if item_type: 

91 return item_type.replace('-', ' ') 

92 return '' 

93 

94 def _extract_publisher(self, item: dict) -> str: 

95 doi = self._extract_doi(item) 

96 if not doi: 

97 return '' 

98 norm_doi = self.doi_m.normalise(doi, include_prefix=False) 

99 if not norm_doi: 

100 return '' 

101 return self.get_publisher_name(norm_doi, item) 

102 

103 def csv_creator(self, item: dict) -> dict: 

104 doi = self._extract_doi(item) 

105 if not doi: 

106 return {} 

107 

108 norm_doi = self.doi_m.normalise(doi, include_prefix=False) 

109 if not norm_doi: 

110 return {} 

111 

112 item_type = self._extract_type(item) 

113 

114 # Build ID (DOI + optional ISBN/ISSN based on type) 

115 ids_list = [f'doi:{norm_doi}'] 

116 if 'ISBN' in item: 

117 if item_type in {'book', 'dissertation', 'edited book', 'monograph', 'reference book', 'report', 'standard'}: 

118 self.id_worker(item['ISBN'], ids_list, self.isbn_worker) 

119 if 'ISSN' in item: 

120 if item_type in {'book series', 'book set', 'journal', 'proceedings series', 'series', 'standard series'}: 

121 self.id_worker(item['ISSN'], ids_list, self.issn_worker) 

122 elif item_type == 'report series': 

123 if not item.get('container-title'): 

124 self.id_worker(item['ISSN'], ids_list, self.issn_worker) 

125 

126 agents_list = self._extract_agents(item) 

127 authors_strings_list, editors_string_list = self.get_agents_strings_list(norm_doi, agents_list) 

128 

129 row = { 

130 'id': ' '.join(ids_list), 

131 'title': self._extract_title(item), 

132 'author': '; '.join(authors_strings_list), 

133 'issue': self._extract_issue(item), 

134 'volume': self._extract_volume(item), 

135 'venue': self._extract_venue(item), 

136 'pub_date': self._extract_pub_date(item), 

137 'page': self._extract_pages(item), 

138 'type': item_type, 

139 'publisher': self._extract_publisher(item), 

140 'editor': '; '.join(editors_string_list) 

141 } 

142 return self.normalise_unicode(row) 

143 

144 def get_crossref_pages(self, item:dict) -> str: 

145 pages_list = re.split(pages_separator, item['page']) 

146 return self.get_pages(pages_list) 

147 

148 def get_publisher_name(self, doi: str, item: dict) -> str: 

149 publisher = item.get('publisher', '') 

150 member = item.get('member') 

151 prefix = item.get('prefix') or doi.split('/')[0] 

152 

153 if member: 

154 name = self._get_publisher_name_by_member(str(member)) 

155 if name: 

156 return f'{name} [crossref:{member}]' 

157 

158 result = self.get_publisher_by_prefix(prefix) 

159 if result: 

160 name, member_id = result 

161 return f'{name} [crossref:{member_id}]' 

162 

163 return f'{publisher} [crossref:{member}]' if member else publisher 

164 

165 def _get_publisher_name_by_member(self, member: str) -> str | None: 

166 if self.use_redis_publishers and self._publishers_redis: 

167 pub_data = self._publishers_redis.get_by_member(member) 

168 if pub_data: 

169 return str(pub_data['name']) 

170 return None 

171 if self.publishers_mapping and member in self.publishers_mapping: 

172 return str(self.publishers_mapping[member]['name']) 

173 return None 

174 

175 

176 def get_venue_name(self, item:dict, row:dict) -> str: 

177 name_and_id = '' 

178 if 'container-title' in item: 

179 if item['container-title']: 

180 if isinstance(item['container-title'], list): 

181 ventit = str(item['container-title'][0]) 

182 else: 

183 ventit = str(item['container-title']) 

184 ventit = self.clean_markup(ventit) 

185 ambiguous_brackets = re.search(ids_inside_square_brackets, ventit) 

186 if ambiguous_brackets: 

187 match = ambiguous_brackets.group(1) 

188 open_bracket = ventit.find(match) - 1 

189 close_bracket = ventit.find(match) + len(match) 

190 ventit = ventit[:open_bracket] + '(' + ventit[open_bracket + 1:] 

191 ventit = ventit[:close_bracket] + ')' + ventit[close_bracket + 1:] 

192 venids_list = list() 

193 if 'ISBN' in item: 

194 if row['type'] in {'book chapter', 'book part', 'book section', 'book track', 'reference entry'}: 

195 self.id_worker(item['ISBN'], venids_list, self.isbn_worker) 

196 

197 if 'ISSN' in item: 

198 if row['type'] in {'book', 'data file', 'dataset', 'edited book', 'journal article', 'journal volume', 'journal issue', 'monograph', 'proceedings', 'peer review', 'reference book', 'reference entry', 'report'}: 

199 self.id_worker(item['ISSN'], venids_list, self.issn_worker) 

200 elif row['type'] == 'report series': 

201 if 'container-title' in item: 

202 if item['container-title']: 

203 self.id_worker(item['ISSN'], venids_list, self.issn_worker) 

204 if venids_list: 

205 name_and_id = ventit + ' [' + ' '.join(venids_list) + ']' 

206 else: 

207 name_and_id = ventit 

208 return name_and_id 

209 

210 def extract_all_ids( 

211 self, entity_dict: dict, is_citing: bool 

212 ) -> tuple[list[str], list[str]]: 

213 all_br: set[str] = set() 

214 all_ra: set[str] = set() 

215 

216 if is_citing: 

217 # VALIDATE RESPONSIBLE AGENTS IDS FOR THE CITING ENTITY 

218 if entity_dict.get("author"): 

219 for author in entity_dict["author"]: 

220 if "ORCID" in author: 

221 orcid = self.orcid_m.normalise(author["ORCID"], include_prefix=True) 

222 if orcid: 

223 all_ra.add(orcid) 

224 

225 if entity_dict.get("editor"): 

226 for editor in entity_dict["editor"]: 

227 if "ORCID" in editor: 

228 orcid = self.orcid_m.normalise(editor["ORCID"], include_prefix=True) 

229 if orcid: 

230 all_ra.add(orcid) 

231 

232 # RETRIEVE CITED IDS OF A CITING ENTITY 

233 else: 

234 citations = [x for x in entity_dict.get("reference", []) if x.get("DOI")] 

235 for cit in citations: 

236 norm_id = self.doi_m.normalise(cit["DOI"], include_prefix=True) 

237 if norm_id: 

238 all_br.add(norm_id) 

239 

240 return list(all_br), list(all_ra) 

241 

242 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]: 

243 authors_strings_list = [] 

244 editors_string_list = [] 

245 

246 # --- 1) DOI → lookup indice --- 

247 raw_index = self.orcid_finder(doi) if doi else None 

248 

249 # --- 2) Parser indice → lista candidati (family, given, orcid normalizzato) --- 

250 def _split_name(text: str) -> Tuple[str, Optional[str]]: 

251 base = re.sub(r'\s*\[.*?\]\s*$', '', text).strip() 

252 if ',' in base: 

253 fam, giv = [p.strip() for p in base.split(',', 1)] 

254 return fam, (giv if giv else None) 

255 toks = base.split() 

256 if len(toks) >= 2: 

257 return toks[-1], ' '.join(toks[:-1]) 

258 return base, None 

259 

260 candidates: List[Tuple[str, Optional[str], str]] = [] 

261 if raw_index: 

262 for k, v in raw_index.items(): 

263 oc = k if str(k).lower().startswith("orcid:") else f"orcid:{k}" 

264 oc_norm = self.orcid_m.normalise(oc, include_prefix=True) 

265 if not oc_norm: 

266 continue 

267 fam, giv = _split_name(v) 

268 candidates.append((fam.lower(), (giv or "").lower() or None, oc_norm)) 

269 

270 # --- 3) Pulizia agenti --- 

271 agents_list = [ 

272 { 

273 k: Cleaner(v).remove_unwanted_characters() 

274 if k in {"family", "given", "name"} and v is not None else v 

275 for k, v in agent_dict.items() 

276 } 

277 for agent_dict in agents_list 

278 ] 

279 

280 def _format_person(a: dict) -> Tuple[Optional[str], Optional[str], Optional[str]]: 

281 family = a.get("family") 

282 given = a.get("given") 

283 if family and given: 

284 return family, given, f"{family}, {given}" 

285 if a.get("name"): 

286 base = a["name"] 

287 if "," in base: 

288 fam, giv = [p.strip() for p in base.split(",", 1)] 

289 return fam, (giv if giv else None), (f"{fam}, {giv}" if giv else fam) 

290 return base, None, base 

291 if family: 

292 return family, given, f"{family}, {given or ''}" 

293 if given: 

294 return None, given, f", {given}" 

295 return None, None, None 

296 

297 # --- helper locali per normalizzazione e iniziale --- 

298 def _norm(s: str) -> str: 

299 return re.sub(r"\s+", " ", (s or "").strip().lower()) 

300 

301 def _initial_from_given(given: Optional[str]) -> str: 

302 if not given: 

303 return "" 

304 first_token = re.split(r"[\s\-]+", given.strip())[0] 

305 m = re.search(r"[A-Za-z0-9]", first_token) 

306 return m.group(0).lower() if m else "" 

307 

308 # --- contatori per disambiguazione *per ruolo* (author/editor) --- 

309 name_counts = defaultdict(int) # key: (role, fam_norm, given_norm) 

310 initial_counts = defaultdict(int) # key: (role, fam_norm, initial) 

311 

312 for _a in agents_list: 

313 role_n = (_a.get("role") or "").strip().lower() 

314 fam_n = _norm(_a.get("family") or "") 

315 giv_n = _norm(_a.get("given") or "") 

316 ini_n = _initial_from_given(_a.get("given")) 

317 name_counts[(role_n, fam_n, giv_n)] += 1 

318 if fam_n and ini_n: 

319 initial_counts[(role_n, fam_n, ini_n)] += 1 

320 

321 # --- insieme di coppie (family_norm, given_norm) presenti nel batch --- 

322 name_pairs = set() 

323 for _a in agents_list: 

324 fam_n = _norm(_a.get("family") or "") 

325 giv_n = _norm(_a.get("given") or "") 

326 name_pairs.add((fam_n, giv_n)) 

327 

328 def _match_orcid(fam: Optional[str], giv: Optional[str], role: str) -> Optional[str]: 

329 if not fam: 

330 return None 

331 

332 role_n = (role or "").strip().lower() 

333 fam_n = _norm(fam) 

334 giv_n = _norm(giv) if giv else "" 

335 init = _initial_from_given(giv) 

336 

337 # filtra candidati indice per family (tollerando containment) 

338 def fam_ok(cf: str) -> bool: 

339 cf_n = _norm(cf) 

340 return cf_n == fam_n or cf_n in fam_n or fam_n in cf_n 

341 

342 cands = [(cf, cg, oc) for (cf, cg, oc) in candidates if fam_ok(cf)] 

343 if not cands: 

344 return None 

345 

346 # A) omonimi perfetti nello *stesso ruolo* → non apporre tag 

347 if name_counts.get((role_n, fam_n, giv_n), 0) > 1: 

348 return None 

349 

350 # 1) MATCH FORTE: given pieno uguale 

351 strong = [(cf, cg, oc) for (cf, cg, oc) in cands if cg and giv_n and _norm(cg) == giv_n] 

352 if len(strong) == 1: 

353 return strong[0][2] 

354 elif len(strong) > 1: 

355 orcids = {oc for (_, _, oc) in strong if oc} 

356 return orcids.pop() if len(orcids) == 1 else None 

357 

358 # --- inversion guard --- 

359 # Se esiste nel batch la coppia invertita (family=cg, given=fam) per un candidato dell'indice, 

360 # disabilita il fallback per iniziale (evita tagging del falso positivo). 

361 inversion_present = any( 

362 cg and (_norm(cg), fam_n) in name_pairs 

363 for (_, cg, _) in cands 

364 ) 

365 if inversion_present: 

366 return None 

367 

368 # 2) FALLBACK PER INIZIALE (solo se UNIVOCO *nel medesimo ruolo*) 

369 if init: 

370 if initial_counts.get((role_n, fam_n, init), 0) > 1: 

371 return None 

372 cands_init = [(cf, cg, oc) for (cf, cg, oc) in cands if _initial_from_given(cg) == init] 

373 if len(cands_init) == 1: 

374 return cands_init[0][2] 

375 elif len(cands_init) > 1: 

376 orcids = {oc for (_, _, oc) in cands_init if oc} 

377 return orcids.pop() if len(orcids) == 1 else None 

378 

379 return None 

380 

381 # --- 4) Costruzione liste --- 

382 for agent in agents_list: 

383 role = agent.get("role", "") 

384 fam, giv, display = _format_person(agent) 

385 if not display: 

386 continue 

387 

388 oc = None 

389 # 1) ORCID nei metadati 

390 for key in ("orcid", "ORCID"): 

391 if key in agent and agent[key]: 

392 raw = agent[key][0] if isinstance(agent[key], list) else agent[key] 

393 oc = self.find_crossref_orcid(raw, doi) 

394 break 

395 

396 # 2) Indice DOI→ORCID 

397 if not oc and candidates: 

398 oc = _match_orcid(fam, giv, role) 

399 

400 if oc: 

401 if not oc.startswith("orcid:"): 

402 oc = f"orcid:{oc}" 

403 display = f"{display} [{oc}]" 

404 self.tmp_orcid_m.storage_manager.set_value(oc, True) 

405 

406 if role == "author": 

407 authors_strings_list.append(display) 

408 elif role == "editor": 

409 editors_string_list.append(display) 

410 

411 return authors_strings_list, editors_string_list 

412 

413 def find_crossref_orcid(self, identifier, doi): 

414 if not isinstance(identifier, str): 

415 return "" 

416 

417 norm_orcid = self.orcid_m.normalise(identifier, include_prefix=True) 

418 if not norm_orcid: 

419 return "" 

420 

421 validity = self.validated_as({"schema": "orcid", "identifier": norm_orcid}) 

422 if validity is True: 

423 return norm_orcid 

424 if validity is False: 

425 return "" 

426 

427 norm_doi = self.doi_m.normalise(doi, include_prefix=True) if doi else None 

428 alt_doi = doi if (doi and not str(doi).lower().startswith("doi:")) else None 

429 found_orcids = set() 

430 

431 # DOI→ORCID index 

432 for candidate in (norm_doi, alt_doi): 

433 if not candidate: 

434 continue 

435 raw = self.orcid_finder(candidate) 

436 if raw: 

437 found_orcids.update(k.replace("orcid:", "").strip() for k in raw.keys()) 

438 

439 bare_orcid = norm_orcid.split(":", 1)[1] 

440 if bare_orcid in found_orcids: 

441 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True) 

442 return norm_orcid 

443 

444 # API OFF → Redis snapshot fallback 

445 if not self.use_orcid_api: 

446 if norm_orcid in self._redis_values_ra: 

447 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True) 

448 return norm_orcid 

449 return "" 

450 

451 # API ON → Redis + validazione manager 

452 norm_id_dict = {"id": norm_orcid, "schema": "orcid"} 

453 if norm_orcid in self.to_validated_id_list(norm_id_dict): 

454 return norm_orcid 

455 

456 return ""