Coverage for oc_ds_converter / oc_idmanager / ror.py: 73%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# SPDX-FileCopyrightText: 2024 Elia Rizzetto <elia.rizzetto2@unibo.it> 

3# SPDX-FileCopyrightText: 2026 Marta Soricetti <marta.soricetti@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7 

8from json import loads 

9from re import match, sub 

10from time import sleep 

11from typing import Optional 

12from urllib.parse import quote, unquote 

13 

14from oc_ds_converter.oc_idmanager.base import IdentifierManager 

15from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

16from oc_ds_converter.oc_idmanager.oc_data_storage.in_memory_manager import InMemoryStorageManager 

17from requests import ReadTimeout, get 

18from requests.exceptions import ConnectionError 

19 

20 

21class RORManager(IdentifierManager): 

22 """This class implements an identifier manager for ROR identifier""" 

23 

24 def __init__(self, use_api_service=True, storage_manager:Optional[StorageManager] = None): 

25 """PMCID manager constructor.""" 

26 super(RORManager, self).__init__() 

27 self._api = "https://api.ror.org/organizations/" 

28 self._use_api_service = use_api_service 

29 if storage_manager is None: 

30 self.storage_manager = InMemoryStorageManager() 

31 else: 

32 self.storage_manager = storage_manager 

33 self._p = "ror:" 

34 

35 def validated_as_id(self, id_string): 

36 ror_validation_value = self.storage_manager.get_value(id_string) 

37 if isinstance(ror_validation_value, bool): 

38 return ror_validation_value 

39 else: 

40 return None 

41 

42 

43 def is_valid(self, ror_id, get_extra_info=False): 

44 ror_id = self.normalise(ror_id, include_prefix=True) 

45 

46 if ror_id is None: 

47 if get_extra_info: 

48 return False, {"id":ror_id, "valid":False} 

49 return False 

50 

51 else: 

52 id_validation_value = self.storage_manager.get_value(ror_id) 

53 if isinstance(id_validation_value, bool): 

54 return id_validation_value 

55 else: 

56 if get_extra_info: 

57 info = self.exists(ror_id, get_extra_info=True) 

58 self.storage_manager.set_full_value(ror_id, info[1]) 

59 return (info[0] and self.syntax_ok(ror_id)), info[1] 

60 validity_check = self.syntax_ok(ror_id) and self.exists(ror_id) 

61 self.storage_manager.set_value(ror_id, validity_check) 

62 

63 return validity_check 

64 

65 def normalise(self, id_string, include_prefix=False): 

66 try: 

67 if id_string.startswith(self._p): 

68 ror_id_string = id_string[len(self._p):] 

69 else: 

70 ror_id_string = id_string 

71 # normalize + remove protocol and domain name if they are included in the ID 

72 ror_id_string = sub(r"\0+", "", sub(r"^(https?://)?(www\.)?(ror\.org/)?", "", sub(r'\s+', "", unquote(ror_id_string)))) 

73 

74 return "%s%s" % ( 

75 self._p if include_prefix else "", 

76 ror_id_string.strip().lower(), 

77 ) 

78 except: 

79 # Any error in processing the ROR ID will return None 

80 return None 

81 

82 def syntax_ok(self, id_string): 

83 if not id_string.startswith("ror:"): 

84 id_string = self._p + id_string 

85 

86 # Check if the ID matches the correct format without protocol or domain 

87 return True if match(r"^ror:0[a-hj-km-np-tv-z|0-9]{6}[0-9]{2}$", id_string) else False 

88 

89 def exists(self, ror_id_full, get_extra_info=False, allow_extra_api=None): 

90 valid_bool = True 

91 if self._use_api_service: 

92 ror_id = self.normalise(ror_id_full) 

93 if ror_id is not None: 

94 tentative = 3 

95 while tentative: 

96 tentative -= 1 

97 try: 

98 r = get(self._api + ror_id, headers=self._headers, timeout=30) 

99 if r.status_code == 200: 

100 r.encoding = "utf-8" 

101 json_res = loads(r.text) 

102 if get_extra_info: 

103 extra_info_result = {} 

104 try: 

105 result = True if json_res['id'] else False 

106 extra_info_result['valid'] = result 

107 return result, extra_info_result 

108 except KeyError: 

109 extra_info_result["valid"] = False 

110 return False, extra_info_result 

111 try: 

112 return True if json_res['id'] else False 

113 except KeyError: 

114 return False 

115 

116 elif 400 <= r.status_code < 500: 

117 if get_extra_info: 

118 return False, {"valid": False} 

119 return False 

120 except ReadTimeout: 

121 # Do nothing, just try again 

122 pass 

123 except ConnectionError: 

124 # Sleep 5 seconds, then try again 

125 sleep(5) 

126 valid_bool = False 

127 else: 

128 if get_extra_info: 

129 return False, {"valid": False} 

130 return False 

131 

132 if get_extra_info: 

133 return valid_bool, {"valid": valid_bool} 

134 return valid_bool 

135 

136 def extra_info(self, api_response, choose_api=None, info_dict={}): 

137 result = {} 

138 result["valid"] = True 

139 # to be implemented 

140 return result