Coverage for oc_ds_converter / ra_processor.py: 86%
178 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it>
3#
4# SPDX-License-Identifier: ISC
6import os
7import re
8import unicodedata
9from csv import DictReader
10from typing import List, Tuple
11from zipfile import ZipFile
13from oc_ds_converter.oc_idmanager import ISBNManager, ISSNManager, ORCIDManager
15from oc_ds_converter.datasource.orcid_index import OrcidIndexInterface
16from oc_ds_converter.lib.cleaner import Cleaner
17from oc_ds_converter.lib.csvmanager import CSVManager
18from oc_ds_converter.lib.master_of_regex import orcid_pattern
21class RaProcessor(object):
22 def __init__(
23 self,
24 orcid_index: str | OrcidIndexInterface | None = None,
25 publishers_filepath: str | None = None,
26 citing_entities: str | None = None,
27 ):
28 self.publishers_mapping = self.load_publishers_mapping(publishers_filepath) if publishers_filepath else None
29 if orcid_index is None:
30 self.orcid_index: OrcidIndexInterface = CSVManager(None)
31 elif isinstance(orcid_index, str):
32 self.orcid_index = CSVManager(orcid_index)
33 else:
34 self.orcid_index = orcid_index
35 if citing_entities:
36 self.unzip_citing_entities(citing_entities)
37 self.citing_entities_set = CSVManager.load_csv_column_as_set(citing_entities, 'id') if citing_entities else None
39 def get_agents_strings_list(self, doi: str, agents_list: List[dict]) -> Tuple[list, list]:
40 authors_strings_list = list()
41 editors_string_list = list()
42 dict_orcid = None
43 if not all('orcid' in agent or 'ORCID' in agent for agent in agents_list):
44 dict_orcid = self.orcid_finder(doi)
45 agents_list = [
46 {k: Cleaner(v).remove_unwanted_characters() if k in {'family', 'given', 'name'} and v is not None
47 else v for k, v in agent_dict.items()} for agent_dict in agents_list]
49 for agent in agents_list:
50 cur_role = agent['role']
51 f_name = None
52 g_name = None
53 agent_string = None
54 if agent.get('family') and agent.get('given'):
55 f_name = agent['family']
56 g_name = agent['given']
57 agent_string = f_name + ', ' + g_name
58 elif agent.get('name'):
59 agent_string = agent['name']
60 f_name = agent_string.split(",")[0].strip() if "," in agent_string else None
61 g_name = agent_string.split(",")[-1].strip() if "," in agent_string else None
63 if f_name and g_name:
64 agent_string = f_name + ', ' + g_name
65 if agent_string is None:
66 if agent.get('family') and not agent.get('given'):
67 if g_name:
68 agent_string = agent['family'] + ', ' + g_name
69 else:
70 agent_string = agent['family'] + ', '
71 elif agent.get('given') and not agent.get('family'):
72 if f_name:
73 agent_string = f_name + ', ' + agent['given']
74 else:
75 agent_string = ', ' + agent['given']
76 orcid = None
77 if 'orcid' in agent:
78 if isinstance(agent['orcid'], list):
79 orcid = str(agent['orcid'][0])
80 else:
81 orcid = str(agent['orcid'])
82 elif 'ORCID' in agent:
83 if isinstance(agent['ORCID'], list):
84 orcid = str(agent['ORCID'][0])
85 else:
86 orcid = str(agent['ORCID'])
87 if orcid:
88 orcid_manager = ORCIDManager(use_api_service=False)
89 orcid = orcid_manager.normalise(orcid, include_prefix=False)
90 orcid = orcid if orcid_manager.check_digit(orcid) else None
91 elif dict_orcid and f_name:
92 for ori in dict_orcid:
93 orc_n: List[str] = dict_orcid[ori].split(', ')
94 orc_f = orc_n[0].lower()
95 orc_g = orc_n[1] if len(orc_n) == 2 else None
96 if f_name.lower() in orc_f.lower() or orc_f.lower() in f_name.lower():
97 if g_name and orc_g:
98 # If there are several authors with the same surname
99 if len([person for person in agents_list if 'family' in person if person['family'] if
100 person['family'].lower() in orc_f.lower() or orc_f.lower() in person[
101 'family'].lower()]) > 1:
102 # If there are several authors with the same surname and the same given names' initials
103 if len([person for person in agents_list if 'given' in person if person['given'] if
104 person['given'][0].lower() == orc_g[0].lower()]) > 1:
105 homonyms_list = [person for person in agents_list if 'given' in person if
106 person['given'] if person['given'].lower() == orc_g.lower()]
107 # If there are homonyms
108 if len(homonyms_list) > 1:
109 # If such homonyms have different roles from the current role
110 if [person for person in homonyms_list if person['role'] != cur_role]:
111 if orc_g.lower() == g_name.lower():
112 orcid = ori
113 else:
114 if orc_g.lower() == g_name.lower():
115 orcid = ori
116 elif orc_g[0].lower() == g_name[0].lower():
117 orcid = ori
118 # If there is a person whose given name is equal to the family name of the current person (a common situation for cjk names)
119 elif any([person for person in agents_list if 'given' in person if person['given'] if
120 person['given'].lower() == f_name.lower()]):
121 if orc_g.lower() == g_name.lower():
122 orcid = ori
123 else:
124 orcid = ori
125 else:
126 orcid = ori
127 if agent_string and orcid:
128 agent_string += ' [' + 'orcid:' + str(orcid) + ']'
129 if agent_string:
130 if agent['role'] == 'author':
131 authors_strings_list.append(agent_string)
132 elif agent['role'] == 'editor':
133 editors_string_list.append(agent_string)
134 return authors_strings_list, editors_string_list
136 def orcid_finder(self, doi: str) -> dict[str, str]:
137 found: dict[str, str] = {}
138 doi = doi.lower()
139 people = self.orcid_index.get_value(doi)
140 if people:
141 for person in people:
142 match = re.search(orcid_pattern, person)
143 if match:
144 orcid = match.group(0)
145 name: str = person[:person.find(orcid)-1]
146 found[orcid] = name.strip().lower()
147 return found
149 def unzip_citing_entities(self, citing_entities):
150 for dirpath, _, filenames in os.walk(citing_entities):
151 for filename in filenames:
152 if filename.endswith('.zip'):
153 with ZipFile(os.path.join(citing_entities, filename), mode='r') as zipf:
154 zipf.extractall(citing_entities)
155 os.remove(os.path.join(citing_entities, filename))
157 def get_pages(self, pages_list:list) -> str:
158 '''
159 This function returns the pages interval.
161 :params pages_list: a list of pages
162 :type item: dict
163 :returns: str -- The output is a string in the format 'START-END', for example, '583-584'. If there are no pages, the output is an empty string.
164 '''
165 roman_letters = {'I', 'V', 'X', 'L', 'C', 'D', 'M'}
166 clean_pages_list = list()
167 for page in pages_list:
168 # e.g. 583-584 or 1_583-1_584
169 if all(c.isdigit() or c == "_" for c in page):
170 clean_pages_list.append(page)
171 # e.g. G27. It is a born digital document. PeerJ uses this approach, where G27 identifies the whole document, since it has no pages.
172 elif len(pages_list) == 1:
173 clean_pages_list.append(page)
174 # e.g. iv-vii. This syntax is used in the prefaces.
175 elif all(c.upper() in roman_letters for c in page):
176 clean_pages_list.append(page)
177 # 583b-584. It is an error. The b must be removed.
178 elif any(c.isdigit() for c in page):
179 page_without_letters = ''.join([c for c in page if c.isdigit() or c == '_'])
180 clean_pages_list.append(page_without_letters)
181 if clean_pages_list:
182 if len(clean_pages_list) == 1:
183 clean_pages_list.append(clean_pages_list[0])
184 return '-'.join(clean_pages_list)
185 return ''
187 @staticmethod
188 def normalise_unicode(metadata: dict) -> dict:
189 return {k:unicodedata.normalize('NFKC', v) for k, v in metadata.items()}
191 @staticmethod
192 def id_worker(field, ids:list, func) -> None:
193 if isinstance(field, list):
194 for i in field:
195 func(str(i), ids)
196 else:
197 id = str(field)
198 func(id, ids)
200 @staticmethod
201 def load_publishers_mapping(publishers_filepath: str) -> dict[str, dict[str, str | set[str]]]:
202 publishers_mapping: dict[str, dict[str, str | set[str]]] = {}
203 with open(publishers_filepath, 'r', encoding='utf-8') as f:
204 data = DictReader(f)
205 for row in data:
206 pub_id = row['id']
207 if pub_id not in publishers_mapping:
208 publishers_mapping[pub_id] = {'name': row['name'], 'prefixes': set()}
209 else:
210 publishers_mapping[pub_id]['name'] = row['name']
211 prefixes = publishers_mapping[pub_id]['prefixes']
212 if isinstance(prefixes, set):
213 prefixes.add(row['prefix'])
214 return publishers_mapping
216 @staticmethod
217 def issn_worker(issnid: str, ids: list) -> None:
218 issn_manager = ISSNManager()
219 norm_issnid = issn_manager.normalise(issnid, include_prefix=False)
220 if norm_issnid and issn_manager.check_digit(norm_issnid) and f'issn:{norm_issnid}' not in ids:
221 ids.append('issn:' + norm_issnid)
223 @staticmethod
224 def isbn_worker(isbnid: str, ids: list) -> None:
225 isbn_manager = ISBNManager()
226 norm_isbnid = isbn_manager.normalise(isbnid, include_prefix=False)
227 if norm_isbnid and isbn_manager.check_digit(norm_isbnid) and f'isbn:{norm_isbnid}' not in ids:
228 ids.append('isbn:' + norm_isbnid)
230 @staticmethod
231 def uppercase_initials(inp_str: str):
232 upper_word_list = []
233 words_list = inp_str.split()
234 for w in words_list:
235 upper_word_list.append(w[0].upper() + w[1:]) if len(w)>1 else upper_word_list.append(w[0].upper())
236 upper_str = " ".join(upper_word_list)
237 return upper_str