Coverage for oc_ds_converter / oc_idmanager / orcid.py: 76%
159 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it>
4#
5# SPDX-License-Identifier: ISC
6import re
7from json import loads
8from re import match, sub
9from time import sleep
10from urllib.parse import quote
11import datetime
13from oc_ds_converter.oc_idmanager.base import IdentifierManager
14from requests import ReadTimeout, get
15from requests.exceptions import ConnectionError
16from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager
17from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
19# POSSIBLE EXTENSION: adding a new parameter in order to directly use the input orcid - doi map in the orcid manager
20class ORCIDManager(IdentifierManager):
21 """This class implements an identifier manager for orcid identifier."""
23 def __init__(self, use_api_service: bool = True, storage_manager: StorageManager | None = None, testing: bool = True) -> None:
24 """Orcid Manager constructor."""
25 super(ORCIDManager, self).__init__()
26 self._api = "https://pub.orcid.org/v3.0/"
27 self._use_api_service = use_api_service
28 if storage_manager is None:
29 self.storage_manager = RedisStorageManager(testing=testing)
30 else:
31 self.storage_manager = storage_manager
33 self._p = "orcid:"
35 def validated_as_id(self, id_string):
36 arxiv_vaidation_value = self.storage_manager.get_value(id_string)
37 if isinstance(arxiv_vaidation_value, bool):
38 return arxiv_vaidation_value
39 else:
40 return None
42 def is_valid(self, id_string, get_extra_info=False):
43 orcid = self.normalise(id_string, include_prefix=True)
44 if orcid is None:
45 if get_extra_info:
47 return False, {"id":orcid, "valid": False}
48 return False
49 else:
50 orcid_vaidation_value = self.storage_manager.get_value(orcid)
51 if isinstance(orcid_vaidation_value, bool):
52 if get_extra_info:
53 return orcid_vaidation_value, {"id": orcid, "valid": orcid_vaidation_value}
54 return orcid_vaidation_value
55 else:
56 if get_extra_info:
57 info = self.exists(orcid, get_extra_info=True)
58 self.storage_manager.set_full_value(orcid,info[1])
59 return (info[0] and self.check_digit(orcid) and self.syntax_ok(orcid)), info[1]
60 validity_check = self.syntax_ok(orcid) and self.check_digit(orcid) and self.exists(orcid)
61 self.storage_manager.set_value(orcid, validity_check)
62 return validity_check
65 def normalise(self, id_string, include_prefix=False):
66 try:
67 orcid_string = sub("[^X0-9]", "", id_string.upper())
69 return "%s%s-%s-%s-%s" % (
70 self._p if (include_prefix and not orcid_string.startswith(self._p)) else "",
71 orcid_string[:4],
72 orcid_string[4:8],
73 orcid_string[8:12],
74 orcid_string[12:16],
75 )
76 except: # Any error in processing the id will return None
77 return None
79 def check_digit(self, orcid):
80 if orcid.startswith(self._p):
81 spl = orcid.find(self._p) + len(self._p)
82 orcid = orcid[spl:]
83 total = 0
84 for d in sub("[^X0-9]", "", orcid.upper())[:-1]:
85 i = 10 if d == "X" else int(d)
86 total = (total + i) * 2
87 reminder = total % 11
88 result = (12 - reminder) % 11
89 return (str(result) == orcid[-1]) or (result == 10 and orcid[-1] == "X")
91 def syntax_ok(self, id_string):
92 if not id_string.startswith(self._p):
93 id_string = self._p+id_string
94 return True if match("^orcid:([0-9]{4}-){3}[0-9]{3}[0-9X]$", id_string, re.IGNORECASE) else False
97 def exists(self, orcid, get_extra_info=False, allow_extra_api=None):
98 info_dict = {"id": orcid}
99 valid_bool = True
100 if self._use_api_service:
101 self._headers["Accept"] = "application/json"
102 orcid = self.normalise(orcid)
103 info_dict = {"id":orcid}
104 if orcid is not None:
105 tentative = 3
106 while tentative:
107 tentative -= 1
108 try:
109 r = get(self._api + quote(orcid), headers=self._headers, timeout=30)
110 if r.status_code == 200:
111 r.encoding = "utf-8"
112 json_res = loads(r.text)
113 valid_bool = json_res.get("orcid-identifier").get("path") == orcid
114 if get_extra_info:
115 info_dict.update(self.extra_info(json_res))
116 return valid_bool, info_dict
117 return valid_bool
118 except ReadTimeout:
119 # Do nothing, just try again
120 pass
121 except ConnectionError:
122 # Sleep 5 seconds, then try again
123 sleep(5)
124 valid_bool = False
125 else:
126 if get_extra_info:
127 info_dict["valid"] = False
128 return False, info_dict
129 return False
130 if get_extra_info:
131 info_dict["valid"] = valid_bool
132 return valid_bool, info_dict
133 return valid_bool
135 def extra_info(self, api_response, choose_api=None, info_dict={}):
136 family_name = ""
137 given_name = ""
138 email = ""
139 external_identifiers = {}
140 submission_date = ""
141 update_date = ""
142 try:
143 person = api_response["person"]
144 try:
145 name = person["name"]
146 try:
147 family_name = name['family-name']['value']
148 except:
149 pass
150 try:
151 given_name = name['given-names']['value']
152 except:
153 pass
154 except:
155 given_name = ""
156 family_name = ""
157 try:
158 email = str(person["emails"]["email"]) if person["emails"]["email"] else ""
159 except:
160 pass
161 try:
162 external_identifiers = {}
163 for y in person["external-identifiers"]:
164 k_vs = {x.get("external-id-type"): x.get("external-id-value") for x in y["external-identifier"]}
165 external_identifiers.update(k_vs)
166 except:
167 external_identifiers = {}
169 except:
170 pass
172 try:
173 history = api_response.get("history")
174 try:
175 submission_date = self.timestamp_to_date(history["submission-date"]["value"])
176 except:
177 submission_date = ""
178 try:
179 update_date = self.timestamp_to_date(history["last-modified-date"]["value"])
180 except:
181 pass
183 except:
184 history = ""
186 result = {}
187 result["valid"] = True
188 result["family_name"] = family_name
189 result["given_name"] = given_name
190 result["email"] = email
191 result["external_identifiers"] = external_identifiers
192 result["submission_date"] = submission_date
193 result["update_date"] = update_date
195 return result
197 def timestamp_to_date(self, timestamp_value):
198 timestamp = timestamp_value / 1000
199 date = datetime.datetime.fromtimestamp(timestamp)
200 date_string = date.strftime("%Y-%m-%d")
201 return date_string