Coverage for oc_ds_converter / oc_idmanager / orcid.py: 76%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6import re 

7from json import loads 

8from re import match, sub 

9from time import sleep 

10from urllib.parse import quote 

11import datetime 

12 

13from oc_ds_converter.oc_idmanager.base import IdentifierManager 

14from requests import ReadTimeout, get 

15from requests.exceptions import ConnectionError 

16from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

17from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

18 

19# POSSIBLE EXTENSION: adding a new parameter in order to directly use the input orcid - doi map in the orcid manager 

20class ORCIDManager(IdentifierManager): 

21 """This class implements an identifier manager for orcid identifier.""" 

22 

23 def __init__(self, use_api_service: bool = True, storage_manager: StorageManager | None = None, testing: bool = True) -> None: 

24 """Orcid Manager constructor.""" 

25 super(ORCIDManager, self).__init__() 

26 self._api = "https://pub.orcid.org/v3.0/" 

27 self._use_api_service = use_api_service 

28 if storage_manager is None: 

29 self.storage_manager = RedisStorageManager(testing=testing) 

30 else: 

31 self.storage_manager = storage_manager 

32 

33 self._p = "orcid:" 

34 

35 def validated_as_id(self, id_string): 

36 arxiv_vaidation_value = self.storage_manager.get_value(id_string) 

37 if isinstance(arxiv_vaidation_value, bool): 

38 return arxiv_vaidation_value 

39 else: 

40 return None 

41 

42 def is_valid(self, id_string, get_extra_info=False): 

43 orcid = self.normalise(id_string, include_prefix=True) 

44 if orcid is None: 

45 if get_extra_info: 

46 

47 return False, {"id":orcid, "valid": False} 

48 return False 

49 else: 

50 orcid_vaidation_value = self.storage_manager.get_value(orcid) 

51 if isinstance(orcid_vaidation_value, bool): 

52 if get_extra_info: 

53 return orcid_vaidation_value, {"id": orcid, "valid": orcid_vaidation_value} 

54 return orcid_vaidation_value 

55 else: 

56 if get_extra_info: 

57 info = self.exists(orcid, get_extra_info=True) 

58 self.storage_manager.set_full_value(orcid,info[1]) 

59 return (info[0] and self.check_digit(orcid) and self.syntax_ok(orcid)), info[1] 

60 validity_check = self.syntax_ok(orcid) and self.check_digit(orcid) and self.exists(orcid) 

61 self.storage_manager.set_value(orcid, validity_check) 

62 return validity_check 

63 

64 

65 def normalise(self, id_string, include_prefix=False): 

66 try: 

67 orcid_string = sub("[^X0-9]", "", id_string.upper()) 

68 

69 return "%s%s-%s-%s-%s" % ( 

70 self._p if (include_prefix and not orcid_string.startswith(self._p)) else "", 

71 orcid_string[:4], 

72 orcid_string[4:8], 

73 orcid_string[8:12], 

74 orcid_string[12:16], 

75 ) 

76 except: # Any error in processing the id will return None 

77 return None 

78 

79 def check_digit(self, orcid): 

80 if orcid.startswith(self._p): 

81 spl = orcid.find(self._p) + len(self._p) 

82 orcid = orcid[spl:] 

83 total = 0 

84 for d in sub("[^X0-9]", "", orcid.upper())[:-1]: 

85 i = 10 if d == "X" else int(d) 

86 total = (total + i) * 2 

87 reminder = total % 11 

88 result = (12 - reminder) % 11 

89 return (str(result) == orcid[-1]) or (result == 10 and orcid[-1] == "X") 

90 

91 def syntax_ok(self, id_string): 

92 if not id_string.startswith(self._p): 

93 id_string = self._p+id_string 

94 return True if match("^orcid:([0-9]{4}-){3}[0-9]{3}[0-9X]$", id_string, re.IGNORECASE) else False 

95 

96 

97 def exists(self, orcid, get_extra_info=False, allow_extra_api=None): 

98 info_dict = {"id": orcid} 

99 valid_bool = True 

100 if self._use_api_service: 

101 self._headers["Accept"] = "application/json" 

102 orcid = self.normalise(orcid) 

103 info_dict = {"id":orcid} 

104 if orcid is not None: 

105 tentative = 3 

106 while tentative: 

107 tentative -= 1 

108 try: 

109 r = get(self._api + quote(orcid), headers=self._headers, timeout=30) 

110 if r.status_code == 200: 

111 r.encoding = "utf-8" 

112 json_res = loads(r.text) 

113 valid_bool = json_res.get("orcid-identifier").get("path") == orcid 

114 if get_extra_info: 

115 info_dict.update(self.extra_info(json_res)) 

116 return valid_bool, info_dict 

117 return valid_bool 

118 except ReadTimeout: 

119 # Do nothing, just try again 

120 pass 

121 except ConnectionError: 

122 # Sleep 5 seconds, then try again 

123 sleep(5) 

124 valid_bool = False 

125 else: 

126 if get_extra_info: 

127 info_dict["valid"] = False 

128 return False, info_dict 

129 return False 

130 if get_extra_info: 

131 info_dict["valid"] = valid_bool 

132 return valid_bool, info_dict 

133 return valid_bool 

134 

135 def extra_info(self, api_response, choose_api=None, info_dict={}): 

136 family_name = "" 

137 given_name = "" 

138 email = "" 

139 external_identifiers = {} 

140 submission_date = "" 

141 update_date = "" 

142 try: 

143 person = api_response["person"] 

144 try: 

145 name = person["name"] 

146 try: 

147 family_name = name['family-name']['value'] 

148 except: 

149 pass 

150 try: 

151 given_name = name['given-names']['value'] 

152 except: 

153 pass 

154 except: 

155 given_name = "" 

156 family_name = "" 

157 try: 

158 email = str(person["emails"]["email"]) if person["emails"]["email"] else "" 

159 except: 

160 pass 

161 try: 

162 external_identifiers = {} 

163 for y in person["external-identifiers"]: 

164 k_vs = {x.get("external-id-type"): x.get("external-id-value") for x in y["external-identifier"]} 

165 external_identifiers.update(k_vs) 

166 except: 

167 external_identifiers = {} 

168 

169 except: 

170 pass 

171 

172 try: 

173 history = api_response.get("history") 

174 try: 

175 submission_date = self.timestamp_to_date(history["submission-date"]["value"]) 

176 except: 

177 submission_date = "" 

178 try: 

179 update_date = self.timestamp_to_date(history["last-modified-date"]["value"]) 

180 except: 

181 pass 

182 

183 except: 

184 history = "" 

185 

186 result = {} 

187 result["valid"] = True 

188 result["family_name"] = family_name 

189 result["given_name"] = given_name 

190 result["email"] = email 

191 result["external_identifiers"] = external_identifiers 

192 result["submission_date"] = submission_date 

193 result["update_date"] = update_date 

194 

195 return result 

196 

197 def timestamp_to_date(self, timestamp_value): 

198 timestamp = timestamp_value / 1000 

199 date = datetime.datetime.fromtimestamp(timestamp) 

200 date_string = date.strftime("%Y-%m-%d") 

201 return date_string