Coverage for oc_ds_converter / oc_idmanager / openalex.py: 74%
102 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2024 Elia Rizzetto <elia.rizzetto2@unibo.it>
2# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3#
4# SPDX-License-Identifier: ISC
6from oc_ds_converter.oc_idmanager.base import IdentifierManager
7from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager
8from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
9from re import sub, match
10from requests import ReadTimeout, get
11from requests.exceptions import ConnectionError
12from json import loads
13from time import sleep
16class OpenAlexManager(IdentifierManager):
17 """This class implements an identifier manager for openalex identifier"""
19 def __init__(self, use_api_service: bool = True, storage_manager: StorageManager | None = None, testing: bool = True) -> None:
20 """OpenAlex manager constructor."""
21 super(OpenAlexManager, self).__init__()
22 if storage_manager is None:
23 self.storage_manager = RedisStorageManager(testing=testing)
24 else:
25 self.storage_manager = storage_manager
26 self._api = "https://api.openalex.org/"
27 self._api_works_route = r"https://api.openalex.org/works/"
28 self._api_sources_route = r"https://api.openalex.org/sources/"
29 self._use_api_service = use_api_service
30 self._p = "openalex:"
31 self._url_id_pref = "https://openalex.org/"
32 self._headers = {
33 "User-Agent": "Identifier Manager / OpenCitations Indexes "
34 "(http://opencitations.net; mailto:contact@opencitations.net)"
35 }
37 def is_valid(self, oal_id, get_extra_info=False):
38 oal_id = self.normalise(oal_id, include_prefix=True)
40 if oal_id is None:
41 return False
42 else:
43 id_validation_value = self.storage_manager.get_value(oal_id)
44 if isinstance(id_validation_value, bool):
45 return id_validation_value
46 else:
47 if get_extra_info:
48 info = self.exists(oal_id, get_extra_info=True)
49 self.storage_manager.set_full_value(oal_id,info[1])
50 return (info[0] and self.syntax_ok(oal_id)), info[1]
51 validity_check = self.syntax_ok(oal_id) and self.exists(oal_id)
52 self.storage_manager.set_value(oal_id, validity_check)
54 return validity_check
56 def normalise(self, id_string, include_prefix=False):
57 try:
58 if id_string.startswith(self._p):
59 oal_string = id_string[len(self._p):]
60 else:
61 oal_string = id_string
63 oal_string = sub(r"\0+", "", (sub(r"\s+", "", oal_string)))
65 oal_string = oal_string.replace(self._api_works_route, '', 1)
66 oal_string = oal_string.replace(self._api_sources_route, '', 1)
67 oal_string = oal_string.replace(self._api, '', 1)
68 oal_string = oal_string.replace(self._url_id_pref, '', 1)
70 oal_string = oal_string.upper()
71 return "%s%s" % (
72 self._p if include_prefix else "",
73 oal_string.strip(),
74 )
75 except:
76 # Any error in processing the OpenAlex ID will return None
77 return None
79 def syntax_ok(self, id_string):
81 if not id_string.startswith("openalex:"):
82 id_string = self._p + id_string
83 return True if match("^openalex:[WS][1-9]\\d*$", id_string) else False
85 def exists(self, openalex_id_full, get_extra_info=False, allow_extra_api=None):
86 valid_bool = True
87 openalex_id_full = self._p + openalex_id_full if not openalex_id_full.startswith(self._p) else openalex_id_full
88 if self._use_api_service:
89 oal_id = self.normalise(openalex_id_full) # returns None or unprefixed ID (include_prefix is set to False)
90 pref_oalid = self._p + oal_id if oal_id else None
91 if pref_oalid is not None:
92 tentative = 3
93 while tentative:
94 tentative -= 1
95 try:
96 r = get(self._api + oal_id, headers=self._headers, timeout=30)
97 if r.status_code == 200:
98 r.encoding = "utf-8"
99 json_res = loads(r.text)
100 if get_extra_info:
101 extra_info_result = {'id': pref_oalid}
102 try:
103 result = True if json_res['id'] == (self._url_id_pref + oal_id) else False
104 extra_info_result['valid'] = result
105 return result, extra_info_result
106 except KeyError:
107 extra_info_result['valid'] = False
108 return False, extra_info_result
109 try:
110 return True if json_res['id'] == (self._url_id_pref + oal_id) else False
111 except KeyError:
112 return False
113 if r.status_code == 429:
114 sleep(1) # only handles per-second rate limits (not per-day rate limits)
115 elif 400 <= r.status_code < 500:
116 if get_extra_info:
117 return False, {'id': pref_oalid, 'valid': False}
118 return False
119 except ReadTimeout:
120 # Do nothing, just try again
121 pass
122 except ConnectionError:
123 # Sleep 5 seconds, then try again
124 sleep(5)
125 valid_bool = False
126 else:
127 if get_extra_info:
128 return False, {'id': pref_oalid, 'valid': False}
129 return False
131 if get_extra_info:
132 return valid_bool, {'id': openalex_id_full, 'valid': valid_bool}
133 return valid_bool
135 def extra_info(self, api_response, choose_api=None, info_dict={}):
136 result = {}
137 result["valid"] = True
138 # to be implemented
139 return result