Coverage for ramose / api_manager.py: 97%

188 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-01 13:49 +0000

1# SPDX-FileCopyrightText: 2018-2021 Silvio Peroni <silvio.peroni@unibo.it> 

2# SPDX-FileCopyrightText: 2020-2021 Marilena Daquino <marilena.daquino2@unibo.it> 

3# SPDX-FileCopyrightText: 2022 Davide Brembilla 

4# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it> 

5# SPDX-FileCopyrightText: 2025 Sergei Slinkin 

6# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

7# 

8# SPDX-License-Identifier: ISC 

9 

10from __future__ import annotations 

11 

12import csv 

13from collections import OrderedDict 

14from importlib import import_module 

15from pathlib import Path 

16from re import findall, match, sub 

17from sys import maxsize, path 

18from typing import TYPE_CHECKING, TypedDict 

19from urllib.parse import urlsplit 

20 

21from ramose._constants import FORMAT_PARTS_WITH_MEDIA_TYPE, PARAM_NAME 

22from ramose.cache import ResultCache 

23from ramose.filters import load_filters_config 

24from ramose.hash_format import parse_auth, parse_custom_params, parse_disable_params, read_spec_file 

25from ramose.operation import Operation, OperationConfig 

26 

27if TYPE_CHECKING: 

28 import types 

29 

30 from ramose.filters import FiltersConfig 

31 

32 

33class APIConfig(TypedDict): 

34 conf: OrderedDict[str, list[dict[str, str]]] 

35 conf_json: list[dict[str, str]] 

36 base_url: str 

37 tp: str 

38 update_endpoint: str 

39 website: str 

40 sources_map: dict[str, str] 

41 disable_params: set[str] 

42 auth_required: bool 

43 addon: types.ModuleType | None 

44 sparql_http_method: str 

45 conf_file: str 

46 

47 

48class APIManager: 

49 # Fixing max size for CSV 

50 @staticmethod 

51 def __max_size_csv() -> None: 

52 max_int = maxsize 

53 while True: 

54 try: 

55 csv.field_size_limit(max_int) 

56 break 

57 except OverflowError: # pragma: no cover 

58 max_int = int(max_int / 10) 

59 

60 @staticmethod 

61 def _load_addon(addon_name: str, conf_file: str) -> types.ModuleType: 

62 addon_path = (Path(conf_file).parent / addon_name).resolve() 

63 if addon_path.parent.joinpath(f"{addon_path.name}.py").is_file(): 

64 path.append(str(addon_path.parent)) 

65 return import_module(addon_path.name) 

66 return import_module(addon_name) 

67 

68 @staticmethod 

69 def _process_api_metadata( 

70 conf_json: list[dict[str, str]], 

71 conf_file: str, 

72 endpoint_override: str | None, 

73 ) -> APIConfig: 

74 item = conf_json[0] 

75 base_url = item["url"] 

76 website = item["base"] 

77 website_parsed = urlsplit(website) 

78 if not website_parsed.scheme or not website_parsed.netloc: 

79 msg = "API #base must be an absolute URL" 

80 raise ValueError(msg) 

81 tp = endpoint_override or item["endpoint"] 

82 update_endpoint = endpoint_override or "" 

83 if not endpoint_override and "update_endpoint" in item: 

84 update_endpoint = item["update_endpoint"] 

85 sources_map: dict[str, str] = {} 

86 if "sources" in item: 

87 for raw_pair in item["sources"].split(";"): 

88 pair = raw_pair.strip() 

89 if not pair: 

90 continue 

91 name, url = pair.split("=", 1) 

92 sources_map[name.strip()] = url.strip() 

93 disable_params_api = parse_disable_params(item["disable_params"]) if "disable_params" in item else set() 

94 auth_required = parse_auth(item["auth"]) if "auth" in item else False 

95 addon = APIManager._load_addon(item["addon"], conf_file) if "addon" in item else None 

96 sparql_http_method = item["method"].strip().lower() if "method" in item else "get" 

97 

98 conf: OrderedDict[str, list[dict[str, str]]] = OrderedDict() 

99 for op_item in conf_json[1:]: 

100 conf.setdefault(APIManager.nor_api_url(op_item, base_url), []).append(op_item) 

101 

102 return { 

103 "conf": conf, 

104 "conf_json": conf_json, 

105 "base_url": base_url, 

106 "tp": tp or "", 

107 "update_endpoint": update_endpoint, 

108 "website": website, 

109 "sources_map": sources_map, 

110 "disable_params": disable_params_api, 

111 "auth_required": auth_required, 

112 "addon": addon, 

113 "sparql_http_method": sparql_http_method, 

114 "conf_file": conf_file, 

115 } 

116 

117 def __init__( # noqa: PLR0913 

118 self, 

119 conf_files: list[str], 

120 endpoint_override: str | None = None, 

121 cache_dir: str | None = None, 

122 cache_ttl: int = 86400, 

123 retry_attempts: int = 3, 

124 retry_wait: float = 0.5, 

125 retry_backoff: float = 2.0, 

126 ) -> None: 

127 """This is the constructor of the APIManager class. It takes in input a list of API configuration files, each 

128 defined according to the Hash Format or YAML mirror format, and stores all the operations defined within a 

129 dictionary. Optionally, an endpoint_override parameter can be provided to override the SPARQL endpoint defined 

130 in the configuration files (useful for staging/production environments). 

131 The structure of each item in the dictionary of the operations is defined as follows: 

132 

133 { 

134 "/api/v1/references/(.+)": { 

135 "sparql": "PREFIX ...", 

136 "method": "get", 

137 ... 

138 }, 

139 ... 

140 } 

141 

142 In particular, each key in the dictionary identifies the full URL of a particular API operation, and it is 

143 used so as to understand with operation should be called once an API call is done. The object associated 

144 as value of this key is the transformation of the related operation defined in the input spec file into a 

145 dictionary. 

146 

147 In addition, it also defines additional structure, such as the functions to be used for interpreting the 

148 values returned by a SPARQL query, some operations that can be used for filtering the results, and the 

149 HTTP methods to call for making the request to the SPARQL endpoint specified in the configuration file.""" 

150 APIManager.__max_size_csv() 

151 

152 self._cache = ResultCache(cache_dir) if cache_dir else None 

153 self._cache_ttl = cache_ttl 

154 self._config_cache: dict[str, FiltersConfig] = {} 

155 self._retry_attempts = retry_attempts 

156 self._retry_wait = retry_wait 

157 self._retry_backoff = retry_backoff 

158 

159 self.all_conf: OrderedDict[str, APIConfig] = OrderedDict() 

160 self.base_url: list[str] = [] 

161 for conf_file in conf_files: 

162 conf_json = read_spec_file(conf_file) 

163 if not conf_json: 

164 continue 

165 api_conf = APIManager._process_api_metadata(conf_json, conf_file, endpoint_override) 

166 self.base_url.append(api_conf["base_url"]) 

167 self.all_conf[api_conf["base_url"]] = api_conf 

168 

169 self._operation_prefixes = APIManager._build_operation_prefixes(self.all_conf) 

170 

171 @staticmethod 

172 def _build_operation_prefixes( 

173 all_conf: OrderedDict[str, APIConfig], 

174 ) -> list[tuple[str, str, dict[str, str]]]: 

175 prefixes: list[tuple[str, str, dict[str, str]]] = [] 

176 for base, api_data in all_conf.items(): 

177 for items in api_data["conf"].values(): 

178 for item in items: 

179 template = item["url"] 

180 brace_pos = template.find("{") 

181 if brace_pos != -1: 

182 prefixes.append((base + template[:brace_pos], base, item)) 

183 prefixes.sort(key=lambda entry: len(entry[0]), reverse=True) 

184 return prefixes 

185 

186 @staticmethod 

187 def nor_api_url(i: dict[str, str], b: str = "") -> str: 

188 """This method takes an API operation object and an optional base URL (e.g. "/api/v1") as input 

189 and returns the URL composed by the base URL plus the API URL normalised according to specific rules. In 

190 particular, these normalisation rules takes the operation URL (e.g. "#url /citations/{oci}") and the 

191 specification of the shape of all the parameters between brackets in the URL (e.g. "#oci str([0-9]+-[0-9]+)"), 

192 and returns a new operation URL where the parameters have been substituted with the regular expressions 

193 defining them (e.g. "/citations/([0-9]+-[0-9]+)"). This URL will be used by RAMOSE for matching the 

194 particular API calls with the specific operation to execute.""" 

195 result = i["url"] 

196 

197 for term in findall(PARAM_NAME, result): 

198 try: 

199 t = i[term] 

200 except KeyError: 

201 t = "str(.+)" 

202 result = result.replace(f"{{{term}}}", "{}".format(sub(r"^[^\(]+(\(.+\))$", r"\1", t))) 

203 

204 return f"{b}{result}" 

205 

206 def best_match(self, u: str, method: str = "get") -> tuple[APIConfig | None, str | None, dict[str, str] | None]: 

207 """This method takes an URL of an API call and the HTTP method in input and finds the API operation URL, 

208 the related configuration and the operation matching the requested method that best match with the API 

209 call, if any. When the path matches but no operation declares the requested method, the first operation 

210 for that path is returned so that the 405 check can reject it.""" 

211 cur_u = sub(r"\?.*$", "", u) 

212 requested = method.lower() 

213 for base_url in self.all_conf: 

214 if u.startswith(base_url): 

215 conf = self.all_conf[base_url] 

216 for pat, items in conf["conf"].items(): 

217 if match(f"^{pat}$", cur_u): 

218 for op_item in items: 

219 if requested in op_item["method"].split(): 

220 return conf, pat, op_item 

221 return conf, pat, items[0] 

222 return None, None, None 

223 

224 @staticmethod 

225 def _parse_format_map(op_conf: dict[str, str]) -> tuple[dict[str, str], dict[str, str]]: 

226 op_format_map: dict[str, str] = {} 

227 op_media_types: dict[str, str] = {} 

228 if "format" not in op_conf: 

229 return op_format_map, op_media_types 

230 fm_val = op_conf["format"] 

231 fm_list = fm_val if isinstance(fm_val, list) else [fm_val] 

232 for fm in fm_list: 

233 for raw_part in fm.split(";"): 

234 part = raw_part.strip() 

235 if not part: 

236 continue 

237 fields = [field.strip() for field in part.split(",")] 

238 op_format_map[fields[0]] = fields[1] 

239 if len(fields) >= FORMAT_PARTS_WITH_MEDIA_TYPE and fields[2]: 

240 op_media_types[fields[0]] = fields[2] 

241 return op_format_map, op_media_types 

242 

243 def _retry_config_for_operation(self, op_conf: dict[str, str]) -> tuple[int, float, float]: 

244 retry_attempts = int(op_conf["retry_attempts"]) if "retry_attempts" in op_conf else self._retry_attempts 

245 retry_wait = float(op_conf["retry_wait"]) if "retry_wait" in op_conf else self._retry_wait 

246 retry_backoff = float(op_conf["retry_backoff"]) if "retry_backoff" in op_conf else self._retry_backoff 

247 return retry_attempts, retry_wait, retry_backoff 

248 

249 def _resolve_custom_param_configs( 

250 self, conf: APIConfig, custom_params_map: dict[str, dict[str, str]] 

251 ) -> dict[str, FiltersConfig]: 

252 result: dict[str, FiltersConfig] = {} 

253 for name, param_conf in custom_params_map.items(): 

254 handler = param_conf["handler"] 

255 if param_conf["phase"] != "preprocess" or not handler.endswith((".yaml", ".yml")): 

256 continue 

257 resolved = str((Path(conf["conf_file"]).parent / handler).resolve()) 

258 if resolved not in self._config_cache: 

259 self._config_cache[resolved] = load_filters_config(resolved) 

260 result[name] = self._config_cache[resolved] 

261 return result 

262 

263 def get_op(self, op_complete_url: str, method: str = "get") -> Operation | tuple[int, str, str]: 

264 """This method returns a new object of type Operation which represent the operation specified by 

265 the input URL (parameter 'op_complete_url)' and the HTTP method. In case no operation can be found 

266 according by checking the configuration files available in the APIManager, a tuple with an HTTP error 

267 code and a message is returned instead.""" 

268 url_parsed = urlsplit(op_complete_url) 

269 op_url = url_parsed.path 

270 

271 conf, op, op_conf = self.best_match(op_url, method) 

272 if conf is not None and op is not None and op_conf is not None: 

273 custom_params_map = parse_custom_params(op_conf["custom_params"]) if "custom_params" in op_conf else {} 

274 

275 api_disabled = conf["disable_params"] 

276 op_disabled = parse_disable_params(op_conf["disable_params"]) if "disable_params" in op_conf else set() 

277 effective_disabled = api_disabled | op_disabled 

278 

279 requires_auth = parse_auth(op_conf["auth"]) if "auth" in op_conf else conf["auth_required"] 

280 

281 op_format_map, op_format_media_types = APIManager._parse_format_map(op_conf) 

282 retry_attempts, retry_wait, retry_backoff = self._retry_config_for_operation(op_conf) 

283 config = OperationConfig( 

284 sparql_endpoint=conf["tp"], 

285 update_endpoint=conf["update_endpoint"], 

286 sparql_http_method=conf["sparql_http_method"], 

287 addon=conf["addon"], 

288 format_map=op_format_map, 

289 format_media_types=op_format_media_types, 

290 sources_map=conf["sources_map"], 

291 custom_params=custom_params_map, 

292 disabled_params=effective_disabled, 

293 requires_auth=requires_auth, 

294 cache=self._cache, 

295 default_cache_ttl=self._cache_ttl, 

296 custom_param_configs=self._resolve_custom_param_configs(conf, custom_params_map), 

297 public_base_url=conf["website"], 

298 retry_attempts=retry_attempts, 

299 retry_wait=retry_wait, 

300 retry_backoff=retry_backoff, 

301 ) 

302 return Operation(op_complete_url, op, op_conf, config) 

303 

304 for prefix, base_url, item in self._operation_prefixes: 

305 if op_url.startswith(prefix): 

306 template = item["url"] 

307 full_template = base_url + template 

308 param_value = op_url[len(prefix) :] 

309 param_names = findall(PARAM_NAME, template) 

310 if not param_value: 

311 msg = ( 

312 f"HTTP status code 400: the operation '{full_template}' " 

313 f"requires a value for parameter '{param_names[0]}'" 

314 ) 

315 else: 

316 msg = ( 

317 f"HTTP status code 400: the value '{param_value}' is not valid for parameter " 

318 f"'{param_names[0]}' in operation '{full_template}'" 

319 ) 

320 if "call" in item: 

321 msg += f". Example: {base_url}{item['call']}" 

322 return 400, msg, "text/plain" 

323 

324 return 404, "HTTP status code 404: the operation requested does not exist", "text/plain"