Coverage for oc_ds_converter / oc_idmanager / arxiv.py: 79%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it> 

4# SPDX-FileCopyrightText: 2026 Marta Soricetti <marta.soricetti@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8 

9from re import compile, match, search 

10from time import sleep 

11from urllib.parse import quote, unquote 

12 

13import xmltodict 

14from oc_ds_converter.oc_idmanager import * 

15from oc_ds_converter.oc_idmanager.base import IdentifierManager 

16from requests import ReadTimeout, get 

17from requests.exceptions import ConnectionError 

18from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

19from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

20 

21 

22class ArXivManager(IdentifierManager): 

23 """This class implements an identifier manager for arxiv identifier""" 

24 

25 def __init__(self, use_api_service: bool = True, storage_manager: StorageManager | None = None, testing: bool = True) -> None: 

26 """arxiv manager constructor.""" 

27 super(ArXivManager,self).__init__() 

28 self._use_api_service = use_api_service 

29 if storage_manager is None: 

30 self.storage_manager = RedisStorageManager(testing=testing) 

31 else: 

32 self.storage_manager = storage_manager 

33 

34 self._p = "arxiv:" 

35 self._api = 'https://export.arxiv.org/api/query?search_query=all:' 

36 self._api_v = 'https://arxiv.org/abs/' 

37 self._headers = { 

38 "User-Agent": "Identifier Manager / OpenCitations Indexes " 

39 "(http://opencitations.net; mailto:contact@opencitations.net)" 

40 } 

41 

42 

43 def validated_as_id(self, id_string): 

44 arxiv_vaidation_value = self.storage_manager.get_value(id_string) 

45 if isinstance(arxiv_vaidation_value, bool): 

46 return arxiv_vaidation_value 

47 else: 

48 return None 

49 

50 def is_valid(self, id_string, get_extra_info=False): 

51 """Check if an arxiv is valid. 

52 

53 Args: 

54 id_string (str): the arxiv to check 

55 

56 Returns: 

57 bool: true if the arxiv is valid, false otherwise. 

58 """ 

59 

60 arxiv = self.normalise(id_string, include_prefix=True) 

61 if not arxiv: 

62 return False 

63 else: 

64 arxiv_vaidation_value = self.storage_manager.get_value(arxiv) 

65 if isinstance(arxiv_vaidation_value, bool): 

66 

67 if get_extra_info: 

68 return arxiv_vaidation_value, {"id":arxiv, "valid":arxiv_vaidation_value} 

69 return arxiv_vaidation_value 

70 else: 

71 if get_extra_info: 

72 info = self.exists(arxiv, get_extra_info=True) 

73 self.storage_manager.set_full_value(arxiv,info[1]) 

74 return (info[0] and self.syntax_ok(arxiv)), info[1] 

75 validity_check = self.syntax_ok(arxiv) and self.exists(arxiv) 

76 self.storage_manager.set_value(arxiv, validity_check) 

77 

78 return validity_check 

79 

80 def normalise(self, id_string, include_prefix=False): 

81 """It returns the arxiv normalized. 

82 

83 Args: 

84 id_string (str): the arxiv to normalize. 

85 include_prefix (bool, optional): indicates if include the prefix. Defaults to False. 

86 

87 Returns: 

88 str: the normalized arxiv 

89 """ 

90 regex = compile(r'[^0-9v.]') 

91 regexdot = compile(r'\.+') 

92 reg_api = compile(r'(https?://export\.arxiv\.org/api/query\?search_query=all:)') 

93 reg_v_api = compile(r'(https?://arxiv\.org/abs/)') 

94 reg_v_extrachar = compile(r'\/?ar[Xx]iv\.?') 

95 

96 if id_string: 

97 id_string = str(id_string).strip().lower() 

98 

99 if id_string.startswith(self._p): 

100 skip_char = len(self._p) 

101 id_string = id_string[skip_char:] 

102 

103 id_string = regexdot.sub('.', id_string) 

104 id_string = reg_v_api.sub('', id_string) 

105 id_string = reg_api.sub('', id_string) 

106 id_string = regex.sub('', id_string) 

107 id_string = reg_v_extrachar.sub('', id_string) 

108 

109 # First parameter is the replacement, second parameter is your input string 

110 

111 try: 

112 id_string = unquote(id_string) 

113 arxiv_string = search(r"(\d{4}.\d{4,5}|[a-z\-]+(\.[A-Z]{2})?\/\d{7})(v\d+)", id_string).group(0) 

114 

115 return "%s%s" % (self._p if include_prefix else "", arxiv_string) 

116 except: 

117 try: 

118 id_string = unquote(id_string) 

119 arxiv_string = search(r"(\d{4}.\d{4,5}|[a-z\-]+(\.[A-Z]{2})?\/\d{7})(v\d+)?", id_string).group(0) 

120 return "%s%s" % (self._p if include_prefix else "", arxiv_string+"v1") 

121 except: 

122 return None 

123 

124 else: 

125 return None 

126 

127 def syntax_ok(self, id_string): 

128 if not id_string.startswith(self._p): 

129 id_string = self._p + id_string 

130 return True if match(r"arxiv:(\d{4}.\d{4,5}|[a-z\-]+(\.[A-Z]{2})?\/\d{7})(v\d+)?$", id_string) else False 

131 

132 

133 def exists(self, arxiv_full, get_extra_info=False, allow_extra_api=None): 

134 """ 

135 Returns True if the id exists, False otherwise. Not all child class check id existence because of API policies 

136 Args: 

137 arxiv_full (str): the arxiv string for the api request 

138 Returns: 

139 bool: True if the arxiv exists (is registered), False otherwise. 

140 """ 

141 valid_bool = True 

142 if self._use_api_service: 

143 arxiv_full_norm = self.normalise(arxiv_full, include_prefix=False) 

144 if arxiv_full_norm: 

145 version = "" 

146 arxiv_string_match = search("(v\d+)$", arxiv_full_norm) 

147 if arxiv_string_match: 

148 version = arxiv_string_match[1] 

149 

150 if version and version != "v1": 

151 api = self._api_v 

152 else: 

153 if version == "v1": 

154 # v1 always exists, and the base API is more efficient, 

155 # so we just check the existence of the base ARXIV id 

156 arxiv_full_norm = arxiv_full_norm[:-2] 

157 

158 api = self._api 

159 

160 

161 tentative = 3 

162 while tentative: 

163 tentative -= 1 

164 try: 

165 r = get( 

166 api + quote(arxiv_full_norm), 

167 headers=self._headers, 

168 timeout=30, 

169 ) 

170 if r.status_code == 200: 

171 if not version or version =="v1": 

172 #data = r.decode('utf-8').text 

173 xml_re = r.text 

174 obj = xmltodict.parse(f'{xml_re}') 

175 feed = obj.get("feed") 

176 results = feed.get("opensearch:totalResults") 

177 try: 

178 results_n = int(results) 

179 except: 

180 results_n = 0 

181 

182 if results_n >0: 

183 if get_extra_info: 

184 return True, self.extra_info(obj) 

185 return True 

186 else: 

187 if get_extra_info: 

188 return False, {"valid": False} 

189 return False 

190 else: 

191 if get_extra_info: 

192 return True, {"valid": True} 

193 return True 

194 else: 

195 if get_extra_info: 

196 return False, {"valid": False} 

197 return False 

198 

199 except ReadTimeout: 

200 # Do nothing, just try again 

201 pass 

202 except ConnectionError: 

203 # Sleep 5 seconds, then try again 

204 sleep(5) 

205 valid_bool = False 

206 else: 

207 if get_extra_info: 

208 return False, {"valid": False} 

209 return False 

210 if get_extra_info: 

211 return valid_bool, {"valid": valid_bool} 

212 return valid_bool 

213 

214 

215 def extra_info(self, api_response, choose_api=None, info_dict:dict={}): 

216 result = {} 

217 result["valid"] = True 

218 # to be implemented 

219 return result 

220