Coverage for oc_ds_converter / oc_idmanager / arxiv.py: 79%
131 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it>
4# SPDX-FileCopyrightText: 2026 Marta Soricetti <marta.soricetti@unibo.it>
5#
6# SPDX-License-Identifier: ISC
9from re import compile, match, search
10from time import sleep
11from urllib.parse import quote, unquote
13import xmltodict
14from oc_ds_converter.oc_idmanager import *
15from oc_ds_converter.oc_idmanager.base import IdentifierManager
16from requests import ReadTimeout, get
17from requests.exceptions import ConnectionError
18from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager
19from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
22class ArXivManager(IdentifierManager):
23 """This class implements an identifier manager for arxiv identifier"""
25 def __init__(self, use_api_service: bool = True, storage_manager: StorageManager | None = None, testing: bool = True) -> None:
26 """arxiv manager constructor."""
27 super(ArXivManager,self).__init__()
28 self._use_api_service = use_api_service
29 if storage_manager is None:
30 self.storage_manager = RedisStorageManager(testing=testing)
31 else:
32 self.storage_manager = storage_manager
34 self._p = "arxiv:"
35 self._api = 'https://export.arxiv.org/api/query?search_query=all:'
36 self._api_v = 'https://arxiv.org/abs/'
37 self._headers = {
38 "User-Agent": "Identifier Manager / OpenCitations Indexes "
39 "(http://opencitations.net; mailto:contact@opencitations.net)"
40 }
43 def validated_as_id(self, id_string):
44 arxiv_vaidation_value = self.storage_manager.get_value(id_string)
45 if isinstance(arxiv_vaidation_value, bool):
46 return arxiv_vaidation_value
47 else:
48 return None
50 def is_valid(self, id_string, get_extra_info=False):
51 """Check if an arxiv is valid.
53 Args:
54 id_string (str): the arxiv to check
56 Returns:
57 bool: true if the arxiv is valid, false otherwise.
58 """
60 arxiv = self.normalise(id_string, include_prefix=True)
61 if not arxiv:
62 return False
63 else:
64 arxiv_vaidation_value = self.storage_manager.get_value(arxiv)
65 if isinstance(arxiv_vaidation_value, bool):
67 if get_extra_info:
68 return arxiv_vaidation_value, {"id":arxiv, "valid":arxiv_vaidation_value}
69 return arxiv_vaidation_value
70 else:
71 if get_extra_info:
72 info = self.exists(arxiv, get_extra_info=True)
73 self.storage_manager.set_full_value(arxiv,info[1])
74 return (info[0] and self.syntax_ok(arxiv)), info[1]
75 validity_check = self.syntax_ok(arxiv) and self.exists(arxiv)
76 self.storage_manager.set_value(arxiv, validity_check)
78 return validity_check
80 def normalise(self, id_string, include_prefix=False):
81 """It returns the arxiv normalized.
83 Args:
84 id_string (str): the arxiv to normalize.
85 include_prefix (bool, optional): indicates if include the prefix. Defaults to False.
87 Returns:
88 str: the normalized arxiv
89 """
90 regex = compile(r'[^0-9v.]')
91 regexdot = compile(r'\.+')
92 reg_api = compile(r'(https?://export\.arxiv\.org/api/query\?search_query=all:)')
93 reg_v_api = compile(r'(https?://arxiv\.org/abs/)')
94 reg_v_extrachar = compile(r'\/?ar[Xx]iv\.?')
96 if id_string:
97 id_string = str(id_string).strip().lower()
99 if id_string.startswith(self._p):
100 skip_char = len(self._p)
101 id_string = id_string[skip_char:]
103 id_string = regexdot.sub('.', id_string)
104 id_string = reg_v_api.sub('', id_string)
105 id_string = reg_api.sub('', id_string)
106 id_string = regex.sub('', id_string)
107 id_string = reg_v_extrachar.sub('', id_string)
109 # First parameter is the replacement, second parameter is your input string
111 try:
112 id_string = unquote(id_string)
113 arxiv_string = search(r"(\d{4}.\d{4,5}|[a-z\-]+(\.[A-Z]{2})?\/\d{7})(v\d+)", id_string).group(0)
115 return "%s%s" % (self._p if include_prefix else "", arxiv_string)
116 except:
117 try:
118 id_string = unquote(id_string)
119 arxiv_string = search(r"(\d{4}.\d{4,5}|[a-z\-]+(\.[A-Z]{2})?\/\d{7})(v\d+)?", id_string).group(0)
120 return "%s%s" % (self._p if include_prefix else "", arxiv_string+"v1")
121 except:
122 return None
124 else:
125 return None
127 def syntax_ok(self, id_string):
128 if not id_string.startswith(self._p):
129 id_string = self._p + id_string
130 return True if match(r"arxiv:(\d{4}.\d{4,5}|[a-z\-]+(\.[A-Z]{2})?\/\d{7})(v\d+)?$", id_string) else False
133 def exists(self, arxiv_full, get_extra_info=False, allow_extra_api=None):
134 """
135 Returns True if the id exists, False otherwise. Not all child class check id existence because of API policies
136 Args:
137 arxiv_full (str): the arxiv string for the api request
138 Returns:
139 bool: True if the arxiv exists (is registered), False otherwise.
140 """
141 valid_bool = True
142 if self._use_api_service:
143 arxiv_full_norm = self.normalise(arxiv_full, include_prefix=False)
144 if arxiv_full_norm:
145 version = ""
146 arxiv_string_match = search("(v\d+)$", arxiv_full_norm)
147 if arxiv_string_match:
148 version = arxiv_string_match[1]
150 if version and version != "v1":
151 api = self._api_v
152 else:
153 if version == "v1":
154 # v1 always exists, and the base API is more efficient,
155 # so we just check the existence of the base ARXIV id
156 arxiv_full_norm = arxiv_full_norm[:-2]
158 api = self._api
161 tentative = 3
162 while tentative:
163 tentative -= 1
164 try:
165 r = get(
166 api + quote(arxiv_full_norm),
167 headers=self._headers,
168 timeout=30,
169 )
170 if r.status_code == 200:
171 if not version or version =="v1":
172 #data = r.decode('utf-8').text
173 xml_re = r.text
174 obj = xmltodict.parse(f'{xml_re}')
175 feed = obj.get("feed")
176 results = feed.get("opensearch:totalResults")
177 try:
178 results_n = int(results)
179 except:
180 results_n = 0
182 if results_n >0:
183 if get_extra_info:
184 return True, self.extra_info(obj)
185 return True
186 else:
187 if get_extra_info:
188 return False, {"valid": False}
189 return False
190 else:
191 if get_extra_info:
192 return True, {"valid": True}
193 return True
194 else:
195 if get_extra_info:
196 return False, {"valid": False}
197 return False
199 except ReadTimeout:
200 # Do nothing, just try again
201 pass
202 except ConnectionError:
203 # Sleep 5 seconds, then try again
204 sleep(5)
205 valid_bool = False
206 else:
207 if get_extra_info:
208 return False, {"valid": False}
209 return False
210 if get_extra_info:
211 return valid_bool, {"valid": valid_bool}
212 return valid_bool
215 def extra_info(self, api_response, choose_api=None, info_dict:dict={}):
216 result = {}
217 result["valid"] = True
218 # to be implemented
219 return result