Coverage for oc_ds_converter / crossref / crossref_processing.py: 86%
312 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
2# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it>
4# SPDX-FileCopyrightText: 2023-2025 Arianna Moretti <arianna.moretti4@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8from __future__ import annotations
10import re
11from collections import defaultdict
12from typing import List, Optional, Tuple
14from oc_ds_converter.lib.cleaner import Cleaner
15from oc_ds_converter.lib.crossref_style_processing import CrossrefStyleProcessing
16from oc_ds_converter.lib.master_of_regex import ids_inside_square_brackets, pages_separator
17from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
20class CrossrefProcessing(CrossrefStyleProcessing):
22 def __init__(
23 self,
24 orcid_index: str | None = None,
25 publishers_filepath: str | None = None,
26 storage_manager: StorageManager | None = None,
27 testing: bool = True,
28 citing: bool = True,
29 use_orcid_api: bool = True,
30 use_redis_orcid_index: bool = False,
31 use_redis_publishers: bool = False,
32 exclude_existing: bool = False,
33 ):
34 super().__init__(
35 orcid_index=orcid_index,
36 publishers_filepath=publishers_filepath,
37 storage_manager=storage_manager,
38 testing=testing,
39 citing=citing,
40 use_redis_orcid_index=use_redis_orcid_index,
41 use_orcid_api=use_orcid_api,
42 use_redis_publishers=use_redis_publishers,
43 )
44 self.exclude_existing = exclude_existing
46 def _extract_doi(self, item: dict) -> str:
47 doi = item.get('DOI', '')
48 if isinstance(doi, list):
49 doi = doi[0] if doi else ''
50 return str(doi)
52 def _extract_title(self, item: dict) -> str:
53 title = item.get('title')
54 if not title:
55 return ''
56 if isinstance(title, list):
57 title = title[0] if title else ''
58 return self.clean_markup(str(title))
60 def _extract_agents(self, item: dict) -> list[dict]:
61 agents_list: list[dict] = []
62 if 'author' in item:
63 for author in item['author']:
64 agents_list.append({**author, 'role': 'author'})
65 if 'editor' in item:
66 for editor in item['editor']:
67 agents_list.append({**editor, 'role': 'editor'})
68 return agents_list
70 def _extract_venue(self, item: dict) -> str:
71 item_type = self._extract_type(item)
72 return self.get_venue_name(item, {'type': item_type})
74 def _extract_pub_date(self, item: dict) -> str:
75 if 'issued' not in item:
76 return ''
77 date_parts = item['issued'].get('date-parts', [[]])
78 if date_parts and date_parts[0] and date_parts[0][0]:
79 return '-'.join([str(y) for y in date_parts[0]])
80 return ''
82 def _extract_pages(self, item: dict) -> str:
83 if 'page' not in item:
84 return ''
85 pages_list = re.split(pages_separator, item['page'])
86 return self.get_pages(pages_list)
88 def _extract_type(self, item: dict) -> str:
89 item_type = item.get('type', '')
90 if item_type:
91 return item_type.replace('-', ' ')
92 return ''
94 def _extract_publisher(self, item: dict) -> str:
95 doi = self._extract_doi(item)
96 if not doi:
97 return ''
98 norm_doi = self.doi_m.normalise(doi, include_prefix=False)
99 if not norm_doi:
100 return ''
101 return self.get_publisher_name(norm_doi, item)
103 def csv_creator(self, item: dict) -> dict:
104 doi = self._extract_doi(item)
105 if not doi:
106 return {}
108 norm_doi = self.doi_m.normalise(doi, include_prefix=False)
109 if not norm_doi:
110 return {}
112 item_type = self._extract_type(item)
114 # Build ID (DOI + optional ISBN/ISSN based on type)
115 ids_list = [f'doi:{norm_doi}']
116 if 'ISBN' in item:
117 if item_type in {'book', 'dissertation', 'edited book', 'monograph', 'reference book', 'report', 'standard'}:
118 self.id_worker(item['ISBN'], ids_list, self.isbn_worker)
119 if 'ISSN' in item:
120 if item_type in {'book series', 'book set', 'journal', 'proceedings series', 'series', 'standard series'}:
121 self.id_worker(item['ISSN'], ids_list, self.issn_worker)
122 elif item_type == 'report series':
123 if not item.get('container-title'):
124 self.id_worker(item['ISSN'], ids_list, self.issn_worker)
126 agents_list = self._extract_agents(item)
127 authors_strings_list, editors_string_list = self.get_agents_strings_list(norm_doi, agents_list)
129 row = {
130 'id': ' '.join(ids_list),
131 'title': self._extract_title(item),
132 'author': '; '.join(authors_strings_list),
133 'issue': self._extract_issue(item),
134 'volume': self._extract_volume(item),
135 'venue': self._extract_venue(item),
136 'pub_date': self._extract_pub_date(item),
137 'page': self._extract_pages(item),
138 'type': item_type,
139 'publisher': self._extract_publisher(item),
140 'editor': '; '.join(editors_string_list)
141 }
142 return self.normalise_unicode(row)
144 def get_crossref_pages(self, item:dict) -> str:
145 pages_list = re.split(pages_separator, item['page'])
146 return self.get_pages(pages_list)
148 def get_publisher_name(self, doi: str, item: dict) -> str:
149 publisher = item.get('publisher', '')
150 member = item.get('member')
151 prefix = item.get('prefix') or doi.split('/')[0]
153 if member:
154 name = self._get_publisher_name_by_member(str(member))
155 if name:
156 return f'{name} [crossref:{member}]'
158 result = self.get_publisher_by_prefix(prefix)
159 if result:
160 name, member_id = result
161 return f'{name} [crossref:{member_id}]'
163 return f'{publisher} [crossref:{member}]' if member else publisher
165 def _get_publisher_name_by_member(self, member: str) -> str | None:
166 if self.use_redis_publishers and self._publishers_redis:
167 pub_data = self._publishers_redis.get_by_member(member)
168 if pub_data:
169 return str(pub_data['name'])
170 return None
171 if self.publishers_mapping and member in self.publishers_mapping:
172 return str(self.publishers_mapping[member]['name'])
173 return None
176 def get_venue_name(self, item:dict, row:dict) -> str:
177 name_and_id = ''
178 if 'container-title' in item:
179 if item['container-title']:
180 if isinstance(item['container-title'], list):
181 ventit = str(item['container-title'][0])
182 else:
183 ventit = str(item['container-title'])
184 ventit = self.clean_markup(ventit)
185 ambiguous_brackets = re.search(ids_inside_square_brackets, ventit)
186 if ambiguous_brackets:
187 match = ambiguous_brackets.group(1)
188 open_bracket = ventit.find(match) - 1
189 close_bracket = ventit.find(match) + len(match)
190 ventit = ventit[:open_bracket] + '(' + ventit[open_bracket + 1:]
191 ventit = ventit[:close_bracket] + ')' + ventit[close_bracket + 1:]
192 venids_list = list()
193 if 'ISBN' in item:
194 if row['type'] in {'book chapter', 'book part', 'book section', 'book track', 'reference entry'}:
195 self.id_worker(item['ISBN'], venids_list, self.isbn_worker)
197 if 'ISSN' in item:
198 if row['type'] in {'book', 'data file', 'dataset', 'edited book', 'journal article', 'journal volume', 'journal issue', 'monograph', 'proceedings', 'peer review', 'reference book', 'reference entry', 'report'}:
199 self.id_worker(item['ISSN'], venids_list, self.issn_worker)
200 elif row['type'] == 'report series':
201 if 'container-title' in item:
202 if item['container-title']:
203 self.id_worker(item['ISSN'], venids_list, self.issn_worker)
204 if venids_list:
205 name_and_id = ventit + ' [' + ' '.join(venids_list) + ']'
206 else:
207 name_and_id = ventit
208 return name_and_id
210 def extract_all_ids(
211 self, entity_dict: dict, is_citing: bool
212 ) -> tuple[list[str], list[str]]:
213 all_br: set[str] = set()
214 all_ra: set[str] = set()
216 if is_citing:
217 # VALIDATE RESPONSIBLE AGENTS IDS FOR THE CITING ENTITY
218 if entity_dict.get("author"):
219 for author in entity_dict["author"]:
220 if "ORCID" in author:
221 orcid = self.orcid_m.normalise(author["ORCID"], include_prefix=True)
222 if orcid:
223 all_ra.add(orcid)
225 if entity_dict.get("editor"):
226 for editor in entity_dict["editor"]:
227 if "ORCID" in editor:
228 orcid = self.orcid_m.normalise(editor["ORCID"], include_prefix=True)
229 if orcid:
230 all_ra.add(orcid)
232 # RETRIEVE CITED IDS OF A CITING ENTITY
233 else:
234 citations = [x for x in entity_dict.get("reference", []) if x.get("DOI")]
235 for cit in citations:
236 norm_id = self.doi_m.normalise(cit["DOI"], include_prefix=True)
237 if norm_id:
238 all_br.add(norm_id)
240 return list(all_br), list(all_ra)
242 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]:
243 authors_strings_list = []
244 editors_string_list = []
246 # --- 1) DOI → lookup indice ---
247 raw_index = self.orcid_finder(doi) if doi else None
249 # --- 2) Parser indice → lista candidati (family, given, orcid normalizzato) ---
250 def _split_name(text: str) -> Tuple[str, Optional[str]]:
251 base = re.sub(r'\s*\[.*?\]\s*$', '', text).strip()
252 if ',' in base:
253 fam, giv = [p.strip() for p in base.split(',', 1)]
254 return fam, (giv if giv else None)
255 toks = base.split()
256 if len(toks) >= 2:
257 return toks[-1], ' '.join(toks[:-1])
258 return base, None
260 candidates: List[Tuple[str, Optional[str], str]] = []
261 if raw_index:
262 for k, v in raw_index.items():
263 oc = k if str(k).lower().startswith("orcid:") else f"orcid:{k}"
264 oc_norm = self.orcid_m.normalise(oc, include_prefix=True)
265 if not oc_norm:
266 continue
267 fam, giv = _split_name(v)
268 candidates.append((fam.lower(), (giv or "").lower() or None, oc_norm))
270 # --- 3) Pulizia agenti ---
271 agents_list = [
272 {
273 k: Cleaner(v).remove_unwanted_characters()
274 if k in {"family", "given", "name"} and v is not None else v
275 for k, v in agent_dict.items()
276 }
277 for agent_dict in agents_list
278 ]
280 def _format_person(a: dict) -> Tuple[Optional[str], Optional[str], Optional[str]]:
281 family = a.get("family")
282 given = a.get("given")
283 if family and given:
284 return family, given, f"{family}, {given}"
285 if a.get("name"):
286 base = a["name"]
287 if "," in base:
288 fam, giv = [p.strip() for p in base.split(",", 1)]
289 return fam, (giv if giv else None), (f"{fam}, {giv}" if giv else fam)
290 return base, None, base
291 if family:
292 return family, given, f"{family}, {given or ''}"
293 if given:
294 return None, given, f", {given}"
295 return None, None, None
297 # --- helper locali per normalizzazione e iniziale ---
298 def _norm(s: str) -> str:
299 return re.sub(r"\s+", " ", (s or "").strip().lower())
301 def _initial_from_given(given: Optional[str]) -> str:
302 if not given:
303 return ""
304 first_token = re.split(r"[\s\-]+", given.strip())[0]
305 m = re.search(r"[A-Za-z0-9]", first_token)
306 return m.group(0).lower() if m else ""
308 # --- contatori per disambiguazione *per ruolo* (author/editor) ---
309 name_counts = defaultdict(int) # key: (role, fam_norm, given_norm)
310 initial_counts = defaultdict(int) # key: (role, fam_norm, initial)
312 for _a in agents_list:
313 role_n = (_a.get("role") or "").strip().lower()
314 fam_n = _norm(_a.get("family") or "")
315 giv_n = _norm(_a.get("given") or "")
316 ini_n = _initial_from_given(_a.get("given"))
317 name_counts[(role_n, fam_n, giv_n)] += 1
318 if fam_n and ini_n:
319 initial_counts[(role_n, fam_n, ini_n)] += 1
321 # --- insieme di coppie (family_norm, given_norm) presenti nel batch ---
322 name_pairs = set()
323 for _a in agents_list:
324 fam_n = _norm(_a.get("family") or "")
325 giv_n = _norm(_a.get("given") or "")
326 name_pairs.add((fam_n, giv_n))
328 def _match_orcid(fam: Optional[str], giv: Optional[str], role: str) -> Optional[str]:
329 if not fam:
330 return None
332 role_n = (role or "").strip().lower()
333 fam_n = _norm(fam)
334 giv_n = _norm(giv) if giv else ""
335 init = _initial_from_given(giv)
337 # filtra candidati indice per family (tollerando containment)
338 def fam_ok(cf: str) -> bool:
339 cf_n = _norm(cf)
340 return cf_n == fam_n or cf_n in fam_n or fam_n in cf_n
342 cands = [(cf, cg, oc) for (cf, cg, oc) in candidates if fam_ok(cf)]
343 if not cands:
344 return None
346 # A) omonimi perfetti nello *stesso ruolo* → non apporre tag
347 if name_counts.get((role_n, fam_n, giv_n), 0) > 1:
348 return None
350 # 1) MATCH FORTE: given pieno uguale
351 strong = [(cf, cg, oc) for (cf, cg, oc) in cands if cg and giv_n and _norm(cg) == giv_n]
352 if len(strong) == 1:
353 return strong[0][2]
354 elif len(strong) > 1:
355 orcids = {oc for (_, _, oc) in strong if oc}
356 return orcids.pop() if len(orcids) == 1 else None
358 # --- inversion guard ---
359 # Se esiste nel batch la coppia invertita (family=cg, given=fam) per un candidato dell'indice,
360 # disabilita il fallback per iniziale (evita tagging del falso positivo).
361 inversion_present = any(
362 cg and (_norm(cg), fam_n) in name_pairs
363 for (_, cg, _) in cands
364 )
365 if inversion_present:
366 return None
368 # 2) FALLBACK PER INIZIALE (solo se UNIVOCO *nel medesimo ruolo*)
369 if init:
370 if initial_counts.get((role_n, fam_n, init), 0) > 1:
371 return None
372 cands_init = [(cf, cg, oc) for (cf, cg, oc) in cands if _initial_from_given(cg) == init]
373 if len(cands_init) == 1:
374 return cands_init[0][2]
375 elif len(cands_init) > 1:
376 orcids = {oc for (_, _, oc) in cands_init if oc}
377 return orcids.pop() if len(orcids) == 1 else None
379 return None
381 # --- 4) Costruzione liste ---
382 for agent in agents_list:
383 role = agent.get("role", "")
384 fam, giv, display = _format_person(agent)
385 if not display:
386 continue
388 oc = None
389 # 1) ORCID nei metadati
390 for key in ("orcid", "ORCID"):
391 if key in agent and agent[key]:
392 raw = agent[key][0] if isinstance(agent[key], list) else agent[key]
393 oc = self.find_crossref_orcid(raw, doi)
394 break
396 # 2) Indice DOI→ORCID
397 if not oc and candidates:
398 oc = _match_orcid(fam, giv, role)
400 if oc:
401 if not oc.startswith("orcid:"):
402 oc = f"orcid:{oc}"
403 display = f"{display} [{oc}]"
404 self.tmp_orcid_m.storage_manager.set_value(oc, True)
406 if role == "author":
407 authors_strings_list.append(display)
408 elif role == "editor":
409 editors_string_list.append(display)
411 return authors_strings_list, editors_string_list
413 def find_crossref_orcid(self, identifier, doi):
414 if not isinstance(identifier, str):
415 return ""
417 norm_orcid = self.orcid_m.normalise(identifier, include_prefix=True)
418 if not norm_orcid:
419 return ""
421 validity = self.validated_as({"schema": "orcid", "identifier": norm_orcid})
422 if validity is True:
423 return norm_orcid
424 if validity is False:
425 return ""
427 norm_doi = self.doi_m.normalise(doi, include_prefix=True) if doi else None
428 alt_doi = doi if (doi and not str(doi).lower().startswith("doi:")) else None
429 found_orcids = set()
431 # DOI→ORCID index
432 for candidate in (norm_doi, alt_doi):
433 if not candidate:
434 continue
435 raw = self.orcid_finder(candidate)
436 if raw:
437 found_orcids.update(k.replace("orcid:", "").strip() for k in raw.keys())
439 bare_orcid = norm_orcid.split(":", 1)[1]
440 if bare_orcid in found_orcids:
441 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True)
442 return norm_orcid
444 # API OFF → Redis snapshot fallback
445 if not self.use_orcid_api:
446 if norm_orcid in self._redis_values_ra:
447 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True)
448 return norm_orcid
449 return ""
451 # API ON → Redis + validazione manager
452 norm_id_dict = {"id": norm_orcid, "schema": "orcid"}
453 if norm_orcid in self.to_validated_id_list(norm_id_dict):
454 return norm_orcid
456 return ""