Coverage for oc_ds_converter / ra_processor.py: 86%
184 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-12 21:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-06-12 21:23 +0000
1# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it>
3#
4# SPDX-License-Identifier: ISC
6import os
7import re
8import unicodedata
9from csv import DictReader
10from typing import List, Tuple
11from zipfile import ZipFile
13from oc_ds_converter.oc_idmanager import ISBNManager, ISSNManager, ORCIDManager
15from oc_ds_converter.datasource.orcid_index import OrcidIndexInterface
16from oc_ds_converter.lib.cleaner import Cleaner
17from oc_ds_converter.lib.csvmanager import CSVManager
18from oc_ds_converter.lib.master_of_regex import orcid_pattern
21def families_match(a: str, b: str) -> bool:
22 tokens_a = {t for t in re.split(r"\s+", (a or "").strip().lower()) if t}
23 tokens_b = {t for t in re.split(r"\s+", (b or "").strip().lower()) if t}
24 if not tokens_a or not tokens_b:
25 return False
26 return tokens_a <= tokens_b or tokens_b <= tokens_a
29class RaProcessor(object):
30 def __init__(
31 self,
32 orcid_index: str | OrcidIndexInterface | CSVManager | None = None,
33 publishers_filepath: str | None = None,
34 citing_entities: str | None = None,
35 ):
36 self.publishers_mapping = self.load_publishers_mapping(publishers_filepath) if publishers_filepath else None
37 if orcid_index is None:
38 self.orcid_index: OrcidIndexInterface = CSVManager(None)
39 elif isinstance(orcid_index, str):
40 self.orcid_index = CSVManager(orcid_index)
41 else:
42 self.orcid_index = orcid_index
43 if citing_entities:
44 self.unzip_citing_entities(citing_entities)
45 self.citing_entities_set = CSVManager.load_csv_column_as_set(citing_entities, 'id') if citing_entities else None
47 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]:
48 authors_strings_list = list()
49 editors_string_list = list()
50 dict_orcid = None
51 if not all('orcid' in agent or 'ORCID' in agent for agent in agents_list):
52 dict_orcid = self.orcid_finder(doi)
53 agents_list = [
54 {k: Cleaner(v).remove_unwanted_characters() if k in {'family', 'given', 'name'} and v is not None
55 else v for k, v in agent_dict.items()} for agent_dict in agents_list]
57 for agent in agents_list:
58 cur_role = agent['role']
59 f_name = None
60 g_name = None
61 agent_string = None
62 if agent.get('family') and agent.get('given'):
63 f_name = agent['family']
64 g_name = agent['given']
65 agent_string = f_name + ', ' + g_name
66 elif agent.get('name'):
67 agent_string = agent['name']
68 f_name = agent_string.split(",")[0].strip() if "," in agent_string else None
69 g_name = agent_string.split(",")[-1].strip() if "," in agent_string else None
71 if f_name and g_name:
72 agent_string = f_name + ', ' + g_name
73 if agent_string is None:
74 if agent.get('family') and not agent.get('given'):
75 if g_name:
76 agent_string = agent['family'] + ', ' + g_name
77 else:
78 agent_string = agent['family'] + ', '
79 elif agent.get('given') and not agent.get('family'):
80 if f_name:
81 agent_string = f_name + ', ' + agent['given']
82 else:
83 agent_string = ', ' + agent['given']
84 orcid = None
85 if 'orcid' in agent:
86 if isinstance(agent['orcid'], list):
87 orcid = str(agent['orcid'][0])
88 else:
89 orcid = str(agent['orcid'])
90 elif 'ORCID' in agent:
91 if isinstance(agent['ORCID'], list):
92 orcid = str(agent['ORCID'][0])
93 else:
94 orcid = str(agent['ORCID'])
95 if orcid:
96 orcid_manager = ORCIDManager(use_api_service=False)
97 orcid = orcid_manager.normalise(orcid, include_prefix=False)
98 orcid = orcid if orcid_manager.check_digit(orcid) else None
99 elif dict_orcid and f_name:
100 for ori in dict_orcid:
101 orc_n: List[str] = dict_orcid[ori].split(', ')
102 orc_f = orc_n[0].lower()
103 orc_g = orc_n[1] if len(orc_n) == 2 else None
104 if families_match(f_name, orc_f):
105 if g_name and orc_g:
106 # If there are several authors with the same surname
107 if len([person for person in agents_list if 'family' in person if person['family'] if
108 families_match(person['family'], orc_f)]) > 1:
109 # If there are several authors with the same surname and the same given names' initials
110 if len([person for person in agents_list if 'given' in person if person['given'] if
111 person['given'][0].lower() == orc_g[0].lower()]) > 1:
112 homonyms_list = [person for person in agents_list if 'given' in person if
113 person['given'] if person['given'].lower() == orc_g.lower()]
114 # If there are homonyms
115 if len(homonyms_list) > 1:
116 # If such homonyms have different roles from the current role
117 if [person for person in homonyms_list if person['role'] != cur_role]:
118 if orc_g.lower() == g_name.lower():
119 orcid = ori
120 else:
121 if orc_g.lower() == g_name.lower():
122 orcid = ori
123 elif orc_g[0].lower() == g_name[0].lower():
124 orcid = ori
125 # If there is a person whose given name is equal to the family name of the current person (a common situation for cjk names)
126 elif any([person for person in agents_list if 'given' in person if person['given'] if
127 person['given'].lower() == f_name.lower()]):
128 if orc_g.lower() == g_name.lower():
129 orcid = ori
130 else:
131 orcid = ori
132 else:
133 orcid = ori
134 if agent_string and orcid:
135 agent_string += ' [' + 'orcid:' + str(orcid) + ']'
136 if agent_string:
137 if agent['role'] == 'author':
138 authors_strings_list.append(agent_string)
139 elif agent['role'] == 'editor':
140 editors_string_list.append(agent_string)
141 return authors_strings_list, editors_string_list
143 def orcid_finder(self, doi: str) -> dict[str, str]:
144 found: dict[str, str] = {}
145 doi = doi.lower()
146 people = self.orcid_index.get_value(doi)
147 if people:
148 for person in people:
149 match = re.search(orcid_pattern, person)
150 if match:
151 orcid = match.group(0)
152 name: str = person[:person.find(orcid)-1]
153 found[orcid] = name.strip().lower()
154 return found
156 def unzip_citing_entities(self, citing_entities):
157 for dirpath, _, filenames in os.walk(citing_entities):
158 for filename in filenames:
159 if filename.endswith('.zip'):
160 with ZipFile(os.path.join(citing_entities, filename), mode='r') as zipf:
161 zipf.extractall(citing_entities)
162 os.remove(os.path.join(citing_entities, filename))
164 def get_pages(self, pages_list:list) -> str:
165 '''
166 This function returns the pages interval.
168 :params pages_list: a list of pages
169 :type item: dict
170 :returns: str -- The output is a string in the format 'START-END', for example, '583-584'. If there are no pages, the output is an empty string.
171 '''
172 roman_letters = {'I', 'V', 'X', 'L', 'C', 'D', 'M'}
173 clean_pages_list = list()
174 for page in pages_list:
175 # e.g. 583-584 or 1_583-1_584
176 if all(c.isdigit() or c == "_" for c in page):
177 clean_pages_list.append(page)
178 # e.g. G27. It is a born digital document. PeerJ uses this approach, where G27 identifies the whole document, since it has no pages.
179 elif len(pages_list) == 1:
180 clean_pages_list.append(page)
181 # e.g. iv-vii. This syntax is used in the prefaces.
182 elif all(c.upper() in roman_letters for c in page):
183 clean_pages_list.append(page)
184 # 583b-584. It is an error. The b must be removed.
185 elif any(c.isdigit() for c in page):
186 page_without_letters = ''.join([c for c in page if c.isdigit() or c == '_'])
187 clean_pages_list.append(page_without_letters)
188 if clean_pages_list:
189 if len(clean_pages_list) == 1:
190 clean_pages_list.append(clean_pages_list[0])
191 return '-'.join(clean_pages_list)
192 return ''
194 @staticmethod
195 def normalise_unicode(metadata: dict) -> dict:
196 return {k:unicodedata.normalize('NFKC', v) for k, v in metadata.items()}
198 @staticmethod
199 def id_worker(field, ids:list, func) -> None:
200 if isinstance(field, list):
201 for i in field:
202 func(str(i), ids)
203 else:
204 id = str(field)
205 func(id, ids)
207 @staticmethod
208 def load_publishers_mapping(publishers_filepath: str) -> dict[str, dict[str, str | set[str]]]:
209 publishers_mapping: dict[str, dict[str, str | set[str]]] = {}
210 with open(publishers_filepath, 'r', encoding='utf-8') as f:
211 data = DictReader(f)
212 for row in data:
213 pub_id = row['id']
214 if pub_id not in publishers_mapping:
215 publishers_mapping[pub_id] = {'name': row['name'], 'prefixes': set()}
216 else:
217 publishers_mapping[pub_id]['name'] = row['name']
218 prefixes = publishers_mapping[pub_id]['prefixes']
219 if isinstance(prefixes, set):
220 prefixes.add(row['prefix'])
221 return publishers_mapping
223 @staticmethod
224 def issn_worker(issnid: str, ids: list) -> None:
225 issn_manager = ISSNManager()
226 norm_issnid = issn_manager.normalise(issnid, include_prefix=False)
227 if norm_issnid and issn_manager.check_digit(norm_issnid) and f'issn:{norm_issnid}' not in ids:
228 ids.append('issn:' + norm_issnid)
230 @staticmethod
231 def isbn_worker(isbnid: str, ids: list) -> None:
232 isbn_manager = ISBNManager()
233 norm_isbnid = isbn_manager.normalise(isbnid, include_prefix=False)
234 if norm_isbnid and isbn_manager.check_digit(norm_isbnid) and f'isbn:{norm_isbnid}' not in ids:
235 ids.append('isbn:' + norm_isbnid)
237 @staticmethod
238 def uppercase_initials(inp_str: str):
239 upper_word_list = []
240 words_list = inp_str.split()
241 for w in words_list:
242 upper_word_list.append(w[0].upper() + w[1:]) if len(w)>1 else upper_word_list.append(w[0].upper())
243 upper_str = " ".join(upper_word_list)
244 return upper_str