Coverage for oc_ds_converter / crossref / crossref_processing.py: 85%
312 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-12 21:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-12 21:23 +0000
1# SPDX-FileCopyrightText: 2019-2020 Fabio Mariani <fabio.mariani555@gmail.com>
2# SPDX-FileCopyrightText: 2021-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it>
4# SPDX-FileCopyrightText: 2023-2025 Arianna Moretti <arianna.moretti4@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8from __future__ import annotations
10import re
11from collections import defaultdict
12from typing import List, Optional, Tuple
14from oc_ds_converter.lib.cleaner import Cleaner
15from oc_ds_converter.lib.crossref_style_processing import CrossrefStyleProcessing
16from oc_ds_converter.lib.master_of_regex import ids_inside_square_brackets, pages_separator
17from oc_ds_converter.ra_processor import families_match
18from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
21class CrossrefProcessing(CrossrefStyleProcessing):
23 def __init__(
24 self,
25 orcid_index: str | None = None,
26 publishers_filepath: str | None = None,
27 storage_manager: StorageManager | None = None,
28 testing: bool = True,
29 citing: bool = True,
30 use_orcid_api: bool = True,
31 use_redis_orcid_index: bool = False,
32 use_redis_publishers: bool = False,
33 exclude_existing: bool = False,
34 ):
35 super().__init__(
36 orcid_index=orcid_index,
37 publishers_filepath=publishers_filepath,
38 storage_manager=storage_manager,
39 testing=testing,
40 citing=citing,
41 use_redis_orcid_index=use_redis_orcid_index,
42 use_orcid_api=use_orcid_api,
43 use_redis_publishers=use_redis_publishers,
44 )
45 self.exclude_existing = exclude_existing
47 def _extract_doi(self, item: dict) -> str:
48 doi = item.get('DOI', '')
49 if isinstance(doi, list):
50 doi = doi[0] if doi else ''
51 return str(doi)
53 def _extract_title(self, item: dict) -> str:
54 title = item.get('title')
55 if not title:
56 return ''
57 if isinstance(title, list):
58 title = title[0] if title else ''
59 return self.clean_markup(str(title))
61 def _extract_agents(self, item: dict) -> list[dict]:
62 agents_list: list[dict] = []
63 if 'author' in item:
64 for author in item['author']:
65 agents_list.append({**author, 'role': 'author'})
66 if 'editor' in item:
67 for editor in item['editor']:
68 agents_list.append({**editor, 'role': 'editor'})
69 return agents_list
71 def _extract_venue(self, item: dict) -> str:
72 item_type = self._extract_type(item)
73 return self.get_venue_name(item, {'type': item_type})
75 def _extract_pub_date(self, item: dict) -> str:
76 if 'issued' not in item:
77 return ''
78 date_parts = item['issued'].get('date-parts', [[]])
79 if date_parts and date_parts[0] and date_parts[0][0]:
80 return '-'.join([str(y) for y in date_parts[0]])
81 return ''
83 def _extract_pages(self, item: dict) -> str:
84 if 'page' not in item:
85 return ''
86 pages_list = re.split(pages_separator, item['page'])
87 return self.get_pages(pages_list)
89 def _extract_type(self, item: dict) -> str:
90 item_type = item.get('type', '')
91 if item_type:
92 return item_type.replace('-', ' ')
93 return ''
95 def _extract_publisher(self, item: dict) -> str:
96 doi = self._extract_doi(item)
97 if not doi:
98 return ''
99 norm_doi = self.doi_m.normalise(doi, include_prefix=False)
100 if not norm_doi:
101 return ''
102 return self.get_publisher_name(norm_doi, item)
104 def csv_creator(self, item: dict) -> dict:
105 doi = self._extract_doi(item)
106 if not doi:
107 return {}
109 norm_doi = self.doi_m.normalise(doi, include_prefix=False)
110 if not norm_doi:
111 return {}
113 item_type = self._extract_type(item)
115 # Build ID (DOI + optional ISBN/ISSN based on type)
116 ids_list = [f'doi:{norm_doi}']
117 if 'ISBN' in item:
118 if item_type in {'book', 'dissertation', 'edited book', 'monograph', 'reference book', 'report', 'standard'}:
119 self.id_worker(item['ISBN'], ids_list, self.isbn_worker)
120 if 'ISSN' in item:
121 if item_type in {'book series', 'book set', 'journal', 'proceedings series', 'series', 'standard series'}:
122 self.id_worker(item['ISSN'], ids_list, self.issn_worker)
123 elif item_type == 'report series':
124 if not item.get('container-title'):
125 self.id_worker(item['ISSN'], ids_list, self.issn_worker)
127 agents_list = self._extract_agents(item)
128 authors_strings_list, editors_string_list = self.get_agents_strings_list(norm_doi, agents_list)
130 row = {
131 'id': ' '.join(ids_list),
132 'title': self._extract_title(item),
133 'author': '; '.join(authors_strings_list),
134 'issue': self._extract_issue(item),
135 'volume': self._extract_volume(item),
136 'venue': self._extract_venue(item),
137 'pub_date': self._extract_pub_date(item),
138 'page': self._extract_pages(item),
139 'type': item_type,
140 'publisher': self._extract_publisher(item),
141 'editor': '; '.join(editors_string_list)
142 }
143 return self.normalise_unicode(row)
145 def get_crossref_pages(self, item:dict) -> str:
146 pages_list = re.split(pages_separator, item['page'])
147 return self.get_pages(pages_list)
149 def get_publisher_name(self, doi: str, item: dict) -> str:
150 publisher = item.get('publisher', '')
151 member = item.get('member')
152 prefix = item.get('prefix') or doi.split('/')[0]
154 if member:
155 name = self._get_publisher_name_by_member(str(member))
156 if name:
157 return f'{name} [crossref:{member}]'
159 result = self.get_publisher_by_prefix(prefix)
160 if result:
161 name, member_id = result
162 return f'{name} [crossref:{member_id}]'
164 return f'{publisher} [crossref:{member}]' if member else publisher
166 def _get_publisher_name_by_member(self, member: str) -> str | None:
167 if self.use_redis_publishers and self._publishers_redis:
168 pub_data = self._publishers_redis.get_by_member(member)
169 if pub_data:
170 return str(pub_data['name'])
171 return None
172 if self.publishers_mapping and member in self.publishers_mapping:
173 return str(self.publishers_mapping[member]['name'])
174 return None
177 def get_venue_name(self, item:dict, row:dict) -> str:
178 name_and_id = ''
179 if 'container-title' in item:
180 if item['container-title']:
181 if isinstance(item['container-title'], list):
182 ventit = str(item['container-title'][0])
183 else:
184 ventit = str(item['container-title'])
185 ventit = self.clean_markup(ventit)
186 ambiguous_brackets = re.search(ids_inside_square_brackets, ventit)
187 if ambiguous_brackets:
188 match = ambiguous_brackets.group(1)
189 open_bracket = ventit.find(match) - 1
190 close_bracket = ventit.find(match) + len(match)
191 ventit = ventit[:open_bracket] + '(' + ventit[open_bracket + 1:]
192 ventit = ventit[:close_bracket] + ')' + ventit[close_bracket + 1:]
193 venids_list = list()
194 if 'ISBN' in item:
195 if row['type'] in {'book chapter', 'book part', 'book section', 'book track', 'reference entry'}:
196 self.id_worker(item['ISBN'], venids_list, self.isbn_worker)
198 if 'ISSN' in item:
199 if row['type'] in {'book', 'data file', 'dataset', 'edited book', 'journal article', 'journal volume', 'journal issue', 'monograph', 'proceedings', 'peer review', 'reference book', 'reference entry', 'report'}:
200 self.id_worker(item['ISSN'], venids_list, self.issn_worker)
201 elif row['type'] == 'report series':
202 if 'container-title' in item:
203 if item['container-title']:
204 self.id_worker(item['ISSN'], venids_list, self.issn_worker)
205 if venids_list:
206 name_and_id = ventit + ' [' + ' '.join(venids_list) + ']'
207 else:
208 name_and_id = ventit
209 return name_and_id
211 def extract_all_ids(
212 self, entity_dict: dict, is_citing: bool
213 ) -> tuple[list[str], list[str]]:
214 all_br: set[str] = set()
215 all_ra: set[str] = set()
217 if is_citing:
218 # VALIDATE RESPONSIBLE AGENTS IDS FOR THE CITING ENTITY
219 if entity_dict.get("author"):
220 for author in entity_dict["author"]:
221 if "ORCID" in author:
222 orcid = self.orcid_m.normalise(author["ORCID"], include_prefix=True)
223 if orcid:
224 all_ra.add(orcid)
226 if entity_dict.get("editor"):
227 for editor in entity_dict["editor"]:
228 if "ORCID" in editor:
229 orcid = self.orcid_m.normalise(editor["ORCID"], include_prefix=True)
230 if orcid:
231 all_ra.add(orcid)
233 # RETRIEVE CITED IDS OF A CITING ENTITY
234 else:
235 citations = [x for x in entity_dict.get("reference", []) if x.get("DOI")]
236 for cit in citations:
237 norm_id = self.doi_m.normalise(cit["DOI"], include_prefix=True)
238 if norm_id:
239 all_br.add(norm_id)
241 return list(all_br), list(all_ra)
243 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]:
244 authors_strings_list = []
245 editors_string_list = []
247 # --- 1) DOI → lookup indice ---
248 raw_index = self.orcid_finder(doi) if doi else None
250 # --- 2) Parser indice → lista candidati (family, given, orcid normalizzato) ---
251 def _split_name(text: str) -> Tuple[str, Optional[str]]:
252 base = re.sub(r'\s*\[.*?\]\s*$', '', text).strip()
253 if ',' in base:
254 fam, giv = [p.strip() for p in base.split(',', 1)]
255 return fam, (giv if giv else None)
256 toks = base.split()
257 if len(toks) >= 2:
258 return toks[-1], ' '.join(toks[:-1])
259 return base, None
261 candidates: List[Tuple[str, Optional[str], str]] = []
262 if raw_index:
263 for k, v in raw_index.items():
264 oc = k if str(k).lower().startswith("orcid:") else f"orcid:{k}"
265 oc_norm = self.orcid_m.normalise(oc, include_prefix=True)
266 if not oc_norm:
267 continue
268 fam, giv = _split_name(v)
269 candidates.append((fam.lower(), (giv or "").lower() or None, oc_norm))
271 # --- 3) Pulizia agenti ---
272 agents_list = [
273 {
274 k: Cleaner(v).remove_unwanted_characters()
275 if k in {"family", "given", "name"} and v is not None else v
276 for k, v in agent_dict.items()
277 }
278 for agent_dict in agents_list
279 ]
281 def _format_person(a: dict) -> Tuple[Optional[str], Optional[str], Optional[str]]:
282 family = a.get("family")
283 given = a.get("given")
284 if family and given:
285 return family, given, f"{family}, {given}"
286 if a.get("name"):
287 base = a["name"]
288 if "," in base:
289 fam, giv = [p.strip() for p in base.split(",", 1)]
290 return fam, (giv if giv else None), (f"{fam}, {giv}" if giv else fam)
291 return base, None, base
292 if family:
293 return family, given, f"{family}, {given or ''}"
294 if given:
295 return None, given, f", {given}"
296 return None, None, None
298 # --- helper locali per normalizzazione e iniziale ---
299 def _norm(s: str) -> str:
300 return re.sub(r"\s+", " ", (s or "").strip().lower())
302 def _initial_from_given(given: Optional[str]) -> str:
303 if not given:
304 return ""
305 first_token = re.split(r"[\s\-]+", given.strip())[0]
306 m = re.search(r"[A-Za-z0-9]", first_token)
307 return m.group(0).lower() if m else ""
309 # --- contatori per disambiguazione *per ruolo* (author/editor) ---
310 name_counts = defaultdict(int) # key: (role, fam_norm, given_norm)
311 initial_counts = defaultdict(int) # key: (role, fam_norm, initial)
313 for _a in agents_list:
314 role_n = (_a.get("role") or "").strip().lower()
315 fam_n = _norm(_a.get("family") or "")
316 giv_n = _norm(_a.get("given") or "")
317 ini_n = _initial_from_given(_a.get("given"))
318 name_counts[(role_n, fam_n, giv_n)] += 1
319 if fam_n and ini_n:
320 initial_counts[(role_n, fam_n, ini_n)] += 1
322 # --- insieme di coppie (family_norm, given_norm) presenti nel batch ---
323 name_pairs = set()
324 for _a in agents_list:
325 fam_n = _norm(_a.get("family") or "")
326 giv_n = _norm(_a.get("given") or "")
327 name_pairs.add((fam_n, giv_n))
329 def _match_orcid(fam: Optional[str], giv: Optional[str], role: str) -> Optional[str]:
330 if not fam:
331 return None
333 role_n = (role or "").strip().lower()
334 fam_n = _norm(fam)
335 giv_n = _norm(giv) if giv else ""
336 init = _initial_from_given(giv)
338 # filtra candidati indice per family (token-subset: gestisce i cognomi
339 # composti senza che frammenti brevi come "li" matchino "gladilin")
340 def fam_ok(cf: str) -> bool:
341 return families_match(cf, fam_n)
343 cands = [(cf, cg, oc) for (cf, cg, oc) in candidates if fam_ok(cf)]
344 if not cands:
345 return None
347 # A) omonimi perfetti nello *stesso ruolo* → non apporre tag
348 if name_counts.get((role_n, fam_n, giv_n), 0) > 1:
349 return None
351 # 1) MATCH FORTE: given pieno uguale
352 strong = [(cf, cg, oc) for (cf, cg, oc) in cands if cg and giv_n and _norm(cg) == giv_n]
353 if len(strong) == 1:
354 return strong[0][2]
355 elif len(strong) > 1:
356 orcids = {oc for (_, _, oc) in strong if oc}
357 return orcids.pop() if len(orcids) == 1 else None
359 # --- inversion guard ---
360 # Se esiste nel batch la coppia invertita (family=cg, given=fam) per un candidato dell'indice,
361 # disabilita il fallback per iniziale (evita tagging del falso positivo).
362 inversion_present = any(
363 cg and (_norm(cg), fam_n) in name_pairs
364 for (_, cg, _) in cands
365 )
366 if inversion_present:
367 return None
369 # 2) FALLBACK PER INIZIALE (solo se UNIVOCO *nel medesimo ruolo*)
370 if init:
371 if initial_counts.get((role_n, fam_n, init), 0) > 1:
372 return None
373 cands_init = [(cf, cg, oc) for (cf, cg, oc) in cands if _initial_from_given(cg) == init]
374 if len(cands_init) == 1:
375 return cands_init[0][2]
376 elif len(cands_init) > 1:
377 orcids = {oc for (_, _, oc) in cands_init if oc}
378 return orcids.pop() if len(orcids) == 1 else None
380 return None
382 # --- 4) Costruzione liste ---
383 for agent in agents_list:
384 role = agent.get("role", "")
385 fam, giv, display = _format_person(agent)
386 if not display:
387 continue
389 oc = None
390 # 1) ORCID nei metadati
391 for key in ("orcid", "ORCID"):
392 if key in agent and agent[key]:
393 raw = agent[key][0] if isinstance(agent[key], list) else agent[key]
394 oc = self.find_crossref_orcid(raw, doi)
395 break
397 # 2) Indice DOI→ORCID
398 if not oc and candidates:
399 oc = _match_orcid(fam, giv, role)
401 if oc:
402 if not oc.startswith("orcid:"):
403 oc = f"orcid:{oc}"
404 display = f"{display} [{oc}]"
405 self.tmp_orcid_m.storage_manager.set_value(oc, True)
407 if role == "author":
408 authors_strings_list.append(display)
409 elif role == "editor":
410 editors_string_list.append(display)
412 return authors_strings_list, editors_string_list
414 def find_crossref_orcid(self, identifier, doi):
415 if not isinstance(identifier, str):
416 return ""
418 norm_orcid = self.orcid_m.normalise(identifier, include_prefix=True)
419 if not norm_orcid:
420 return ""
422 validity = self.validated_as({"schema": "orcid", "identifier": norm_orcid})
423 if validity is True:
424 return norm_orcid
425 if validity is False:
426 return ""
428 norm_doi = self.doi_m.normalise(doi, include_prefix=True) if doi else None
429 alt_doi = doi if (doi and not str(doi).lower().startswith("doi:")) else None
430 found_orcids = set()
432 # DOI→ORCID index
433 for candidate in (norm_doi, alt_doi):
434 if not candidate:
435 continue
436 raw = self.orcid_finder(candidate)
437 if raw:
438 found_orcids.update(k.replace("orcid:", "").strip() for k in raw.keys())
440 bare_orcid = norm_orcid.split(":", 1)[1]
441 if bare_orcid in found_orcids:
442 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True)
443 return norm_orcid
445 # API OFF → Redis snapshot fallback
446 if not self.use_orcid_api:
447 if norm_orcid in self._redis_values_ra:
448 self.tmp_orcid_m.storage_manager.set_value(norm_orcid, True)
449 return norm_orcid
450 return ""
452 # API ON → Redis + validazione manager
453 norm_id_dict = {"id": norm_orcid, "schema": "orcid"}
454 if norm_orcid in self.to_validated_id_list(norm_id_dict):
455 return norm_orcid
457 return ""