Coverage for oc_ds_converter / oc_idmanager / ror.py: 73%
98 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2# SPDX-FileCopyrightText: 2024 Elia Rizzetto <elia.rizzetto2@unibo.it>
3# SPDX-FileCopyrightText: 2026 Marta Soricetti <marta.soricetti@unibo.it>
4#
5# SPDX-License-Identifier: ISC
8from json import loads
9from re import match, sub
10from time import sleep
11from typing import Optional
12from urllib.parse import quote, unquote
14from oc_ds_converter.oc_idmanager.base import IdentifierManager
15from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
16from oc_ds_converter.oc_idmanager.oc_data_storage.in_memory_manager import InMemoryStorageManager
17from requests import ReadTimeout, get
18from requests.exceptions import ConnectionError
21class RORManager(IdentifierManager):
22 """This class implements an identifier manager for ROR identifier"""
24 def __init__(self, use_api_service=True, storage_manager:Optional[StorageManager] = None):
25 """PMCID manager constructor."""
26 super(RORManager, self).__init__()
27 self._api = "https://api.ror.org/organizations/"
28 self._use_api_service = use_api_service
29 if storage_manager is None:
30 self.storage_manager = InMemoryStorageManager()
31 else:
32 self.storage_manager = storage_manager
33 self._p = "ror:"
35 def validated_as_id(self, id_string):
36 ror_validation_value = self.storage_manager.get_value(id_string)
37 if isinstance(ror_validation_value, bool):
38 return ror_validation_value
39 else:
40 return None
43 def is_valid(self, ror_id, get_extra_info=False):
44 ror_id = self.normalise(ror_id, include_prefix=True)
46 if ror_id is None:
47 if get_extra_info:
48 return False, {"id":ror_id, "valid":False}
49 return False
51 else:
52 id_validation_value = self.storage_manager.get_value(ror_id)
53 if isinstance(id_validation_value, bool):
54 return id_validation_value
55 else:
56 if get_extra_info:
57 info = self.exists(ror_id, get_extra_info=True)
58 self.storage_manager.set_full_value(ror_id, info[1])
59 return (info[0] and self.syntax_ok(ror_id)), info[1]
60 validity_check = self.syntax_ok(ror_id) and self.exists(ror_id)
61 self.storage_manager.set_value(ror_id, validity_check)
63 return validity_check
65 def normalise(self, id_string, include_prefix=False):
66 try:
67 if id_string.startswith(self._p):
68 ror_id_string = id_string[len(self._p):]
69 else:
70 ror_id_string = id_string
71 # normalize + remove protocol and domain name if they are included in the ID
72 ror_id_string = sub(r"\0+", "", sub(r"^(https?://)?(www\.)?(ror\.org/)?", "", sub(r'\s+', "", unquote(ror_id_string))))
74 return "%s%s" % (
75 self._p if include_prefix else "",
76 ror_id_string.strip().lower(),
77 )
78 except:
79 # Any error in processing the ROR ID will return None
80 return None
82 def syntax_ok(self, id_string):
83 if not id_string.startswith("ror:"):
84 id_string = self._p + id_string
86 # Check if the ID matches the correct format without protocol or domain
87 return True if match(r"^ror:0[a-hj-km-np-tv-z|0-9]{6}[0-9]{2}$", id_string) else False
89 def exists(self, ror_id_full, get_extra_info=False, allow_extra_api=None):
90 valid_bool = True
91 if self._use_api_service:
92 ror_id = self.normalise(ror_id_full)
93 if ror_id is not None:
94 tentative = 3
95 while tentative:
96 tentative -= 1
97 try:
98 r = get(self._api + ror_id, headers=self._headers, timeout=30)
99 if r.status_code == 200:
100 r.encoding = "utf-8"
101 json_res = loads(r.text)
102 if get_extra_info:
103 extra_info_result = {}
104 try:
105 result = True if json_res['id'] else False
106 extra_info_result['valid'] = result
107 return result, extra_info_result
108 except KeyError:
109 extra_info_result["valid"] = False
110 return False, extra_info_result
111 try:
112 return True if json_res['id'] else False
113 except KeyError:
114 return False
116 elif 400 <= r.status_code < 500:
117 if get_extra_info:
118 return False, {"valid": False}
119 return False
120 except ReadTimeout:
121 # Do nothing, just try again
122 pass
123 except ConnectionError:
124 # Sleep 5 seconds, then try again
125 sleep(5)
126 valid_bool = False
127 else:
128 if get_extra_info:
129 return False, {"valid": False}
130 return False
132 if get_extra_info:
133 return valid_bool, {"valid": valid_bool}
134 return valid_bool
136 def extra_info(self, api_response, choose_api=None, info_dict={}):
137 result = {}
138 result["valid"] = True
139 # to be implemented
140 return result