Coverage for oc_ds_converter / oc_idmanager / openalex.py: 74%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2024 Elia Rizzetto <elia.rizzetto2@unibo.it> 

2# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# 

4# SPDX-License-Identifier: ISC 

5 

6from oc_ds_converter.oc_idmanager.base import IdentifierManager 

7from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

8from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

9from re import sub, match 

10from requests import ReadTimeout, get 

11from requests.exceptions import ConnectionError 

12from json import loads 

13from time import sleep 

14 

15 

16class OpenAlexManager(IdentifierManager): 

17 """This class implements an identifier manager for openalex identifier""" 

18 

19 def __init__(self, use_api_service: bool = True, storage_manager: StorageManager | None = None, testing: bool = True) -> None: 

20 """OpenAlex manager constructor.""" 

21 super(OpenAlexManager, self).__init__() 

22 if storage_manager is None: 

23 self.storage_manager = RedisStorageManager(testing=testing) 

24 else: 

25 self.storage_manager = storage_manager 

26 self._api = "https://api.openalex.org/" 

27 self._api_works_route = r"https://api.openalex.org/works/" 

28 self._api_sources_route = r"https://api.openalex.org/sources/" 

29 self._use_api_service = use_api_service 

30 self._p = "openalex:" 

31 self._url_id_pref = "https://openalex.org/" 

32 self._headers = { 

33 "User-Agent": "Identifier Manager / OpenCitations Indexes " 

34 "(http://opencitations.net; mailto:contact@opencitations.net)" 

35 } 

36 

37 def is_valid(self, oal_id, get_extra_info=False): 

38 oal_id = self.normalise(oal_id, include_prefix=True) 

39 

40 if oal_id is None: 

41 return False 

42 else: 

43 id_validation_value = self.storage_manager.get_value(oal_id) 

44 if isinstance(id_validation_value, bool): 

45 return id_validation_value 

46 else: 

47 if get_extra_info: 

48 info = self.exists(oal_id, get_extra_info=True) 

49 self.storage_manager.set_full_value(oal_id,info[1]) 

50 return (info[0] and self.syntax_ok(oal_id)), info[1] 

51 validity_check = self.syntax_ok(oal_id) and self.exists(oal_id) 

52 self.storage_manager.set_value(oal_id, validity_check) 

53 

54 return validity_check 

55 

56 def normalise(self, id_string, include_prefix=False): 

57 try: 

58 if id_string.startswith(self._p): 

59 oal_string = id_string[len(self._p):] 

60 else: 

61 oal_string = id_string 

62 

63 oal_string = sub(r"\0+", "", (sub(r"\s+", "", oal_string))) 

64 

65 oal_string = oal_string.replace(self._api_works_route, '', 1) 

66 oal_string = oal_string.replace(self._api_sources_route, '', 1) 

67 oal_string = oal_string.replace(self._api, '', 1) 

68 oal_string = oal_string.replace(self._url_id_pref, '', 1) 

69 

70 oal_string = oal_string.upper() 

71 return "%s%s" % ( 

72 self._p if include_prefix else "", 

73 oal_string.strip(), 

74 ) 

75 except: 

76 # Any error in processing the OpenAlex ID will return None 

77 return None 

78 

79 def syntax_ok(self, id_string): 

80 

81 if not id_string.startswith("openalex:"): 

82 id_string = self._p + id_string 

83 return True if match("^openalex:[WS][1-9]\\d*$", id_string) else False 

84 

85 def exists(self, openalex_id_full, get_extra_info=False, allow_extra_api=None): 

86 valid_bool = True 

87 openalex_id_full = self._p + openalex_id_full if not openalex_id_full.startswith(self._p) else openalex_id_full 

88 if self._use_api_service: 

89 oal_id = self.normalise(openalex_id_full) # returns None or unprefixed ID (include_prefix is set to False) 

90 pref_oalid = self._p + oal_id if oal_id else None 

91 if pref_oalid is not None: 

92 tentative = 3 

93 while tentative: 

94 tentative -= 1 

95 try: 

96 r = get(self._api + oal_id, headers=self._headers, timeout=30) 

97 if r.status_code == 200: 

98 r.encoding = "utf-8" 

99 json_res = loads(r.text) 

100 if get_extra_info: 

101 extra_info_result = {'id': pref_oalid} 

102 try: 

103 result = True if json_res['id'] == (self._url_id_pref + oal_id) else False 

104 extra_info_result['valid'] = result 

105 return result, extra_info_result 

106 except KeyError: 

107 extra_info_result['valid'] = False 

108 return False, extra_info_result 

109 try: 

110 return True if json_res['id'] == (self._url_id_pref + oal_id) else False 

111 except KeyError: 

112 return False 

113 if r.status_code == 429: 

114 sleep(1) # only handles per-second rate limits (not per-day rate limits) 

115 elif 400 <= r.status_code < 500: 

116 if get_extra_info: 

117 return False, {'id': pref_oalid, 'valid': False} 

118 return False 

119 except ReadTimeout: 

120 # Do nothing, just try again 

121 pass 

122 except ConnectionError: 

123 # Sleep 5 seconds, then try again 

124 sleep(5) 

125 valid_bool = False 

126 else: 

127 if get_extra_info: 

128 return False, {'id': pref_oalid, 'valid': False} 

129 return False 

130 

131 if get_extra_info: 

132 return valid_bool, {'id': openalex_id_full, 'valid': valid_bool} 

133 return valid_bool 

134 

135 def extra_info(self, api_response, choose_api=None, info_dict={}): 

136 result = {} 

137 result["valid"] = True 

138 # to be implemented 

139 return result