Coverage for ramose / api_manager.py: 97%
188 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-01 13:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-01 13:49 +0000
1# SPDX-FileCopyrightText: 2018-2021 Silvio Peroni <silvio.peroni@unibo.it>
2# SPDX-FileCopyrightText: 2020-2021 Marilena Daquino <marilena.daquino2@unibo.it>
3# SPDX-FileCopyrightText: 2022 Davide Brembilla
4# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it>
5# SPDX-FileCopyrightText: 2025 Sergei Slinkin
6# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
7#
8# SPDX-License-Identifier: ISC
10from __future__ import annotations
12import csv
13from collections import OrderedDict
14from importlib import import_module
15from pathlib import Path
16from re import findall, match, sub
17from sys import maxsize, path
18from typing import TYPE_CHECKING, TypedDict
19from urllib.parse import urlsplit
21from ramose._constants import FORMAT_PARTS_WITH_MEDIA_TYPE, PARAM_NAME
22from ramose.cache import ResultCache
23from ramose.filters import load_filters_config
24from ramose.hash_format import parse_auth, parse_custom_params, parse_disable_params, read_spec_file
25from ramose.operation import Operation, OperationConfig
27if TYPE_CHECKING:
28 import types
30 from ramose.filters import FiltersConfig
33class APIConfig(TypedDict):
34 conf: OrderedDict[str, list[dict[str, str]]]
35 conf_json: list[dict[str, str]]
36 base_url: str
37 tp: str
38 update_endpoint: str
39 website: str
40 sources_map: dict[str, str]
41 disable_params: set[str]
42 auth_required: bool
43 addon: types.ModuleType | None
44 sparql_http_method: str
45 conf_file: str
48class APIManager:
49 # Fixing max size for CSV
50 @staticmethod
51 def __max_size_csv() -> None:
52 max_int = maxsize
53 while True:
54 try:
55 csv.field_size_limit(max_int)
56 break
57 except OverflowError: # pragma: no cover
58 max_int = int(max_int / 10)
60 @staticmethod
61 def _load_addon(addon_name: str, conf_file: str) -> types.ModuleType:
62 addon_path = (Path(conf_file).parent / addon_name).resolve()
63 if addon_path.parent.joinpath(f"{addon_path.name}.py").is_file():
64 path.append(str(addon_path.parent))
65 return import_module(addon_path.name)
66 return import_module(addon_name)
68 @staticmethod
69 def _process_api_metadata(
70 conf_json: list[dict[str, str]],
71 conf_file: str,
72 endpoint_override: str | None,
73 ) -> APIConfig:
74 item = conf_json[0]
75 base_url = item["url"]
76 website = item["base"]
77 website_parsed = urlsplit(website)
78 if not website_parsed.scheme or not website_parsed.netloc:
79 msg = "API #base must be an absolute URL"
80 raise ValueError(msg)
81 tp = endpoint_override or item["endpoint"]
82 update_endpoint = endpoint_override or ""
83 if not endpoint_override and "update_endpoint" in item:
84 update_endpoint = item["update_endpoint"]
85 sources_map: dict[str, str] = {}
86 if "sources" in item:
87 for raw_pair in item["sources"].split(";"):
88 pair = raw_pair.strip()
89 if not pair:
90 continue
91 name, url = pair.split("=", 1)
92 sources_map[name.strip()] = url.strip()
93 disable_params_api = parse_disable_params(item["disable_params"]) if "disable_params" in item else set()
94 auth_required = parse_auth(item["auth"]) if "auth" in item else False
95 addon = APIManager._load_addon(item["addon"], conf_file) if "addon" in item else None
96 sparql_http_method = item["method"].strip().lower() if "method" in item else "get"
98 conf: OrderedDict[str, list[dict[str, str]]] = OrderedDict()
99 for op_item in conf_json[1:]:
100 conf.setdefault(APIManager.nor_api_url(op_item, base_url), []).append(op_item)
102 return {
103 "conf": conf,
104 "conf_json": conf_json,
105 "base_url": base_url,
106 "tp": tp or "",
107 "update_endpoint": update_endpoint,
108 "website": website,
109 "sources_map": sources_map,
110 "disable_params": disable_params_api,
111 "auth_required": auth_required,
112 "addon": addon,
113 "sparql_http_method": sparql_http_method,
114 "conf_file": conf_file,
115 }
117 def __init__( # noqa: PLR0913
118 self,
119 conf_files: list[str],
120 endpoint_override: str | None = None,
121 cache_dir: str | None = None,
122 cache_ttl: int = 86400,
123 retry_attempts: int = 3,
124 retry_wait: float = 0.5,
125 retry_backoff: float = 2.0,
126 ) -> None:
127 """This is the constructor of the APIManager class. It takes in input a list of API configuration files, each
128 defined according to the Hash Format or YAML mirror format, and stores all the operations defined within a
129 dictionary. Optionally, an endpoint_override parameter can be provided to override the SPARQL endpoint defined
130 in the configuration files (useful for staging/production environments).
131 The structure of each item in the dictionary of the operations is defined as follows:
133 {
134 "/api/v1/references/(.+)": {
135 "sparql": "PREFIX ...",
136 "method": "get",
137 ...
138 },
139 ...
140 }
142 In particular, each key in the dictionary identifies the full URL of a particular API operation, and it is
143 used so as to understand with operation should be called once an API call is done. The object associated
144 as value of this key is the transformation of the related operation defined in the input spec file into a
145 dictionary.
147 In addition, it also defines additional structure, such as the functions to be used for interpreting the
148 values returned by a SPARQL query, some operations that can be used for filtering the results, and the
149 HTTP methods to call for making the request to the SPARQL endpoint specified in the configuration file."""
150 APIManager.__max_size_csv()
152 self._cache = ResultCache(cache_dir) if cache_dir else None
153 self._cache_ttl = cache_ttl
154 self._config_cache: dict[str, FiltersConfig] = {}
155 self._retry_attempts = retry_attempts
156 self._retry_wait = retry_wait
157 self._retry_backoff = retry_backoff
159 self.all_conf: OrderedDict[str, APIConfig] = OrderedDict()
160 self.base_url: list[str] = []
161 for conf_file in conf_files:
162 conf_json = read_spec_file(conf_file)
163 if not conf_json:
164 continue
165 api_conf = APIManager._process_api_metadata(conf_json, conf_file, endpoint_override)
166 self.base_url.append(api_conf["base_url"])
167 self.all_conf[api_conf["base_url"]] = api_conf
169 self._operation_prefixes = APIManager._build_operation_prefixes(self.all_conf)
171 @staticmethod
172 def _build_operation_prefixes(
173 all_conf: OrderedDict[str, APIConfig],
174 ) -> list[tuple[str, str, dict[str, str]]]:
175 prefixes: list[tuple[str, str, dict[str, str]]] = []
176 for base, api_data in all_conf.items():
177 for items in api_data["conf"].values():
178 for item in items:
179 template = item["url"]
180 brace_pos = template.find("{")
181 if brace_pos != -1:
182 prefixes.append((base + template[:brace_pos], base, item))
183 prefixes.sort(key=lambda entry: len(entry[0]), reverse=True)
184 return prefixes
186 @staticmethod
187 def nor_api_url(i: dict[str, str], b: str = "") -> str:
188 """This method takes an API operation object and an optional base URL (e.g. "/api/v1") as input
189 and returns the URL composed by the base URL plus the API URL normalised according to specific rules. In
190 particular, these normalisation rules takes the operation URL (e.g. "#url /citations/{oci}") and the
191 specification of the shape of all the parameters between brackets in the URL (e.g. "#oci str([0-9]+-[0-9]+)"),
192 and returns a new operation URL where the parameters have been substituted with the regular expressions
193 defining them (e.g. "/citations/([0-9]+-[0-9]+)"). This URL will be used by RAMOSE for matching the
194 particular API calls with the specific operation to execute."""
195 result = i["url"]
197 for term in findall(PARAM_NAME, result):
198 try:
199 t = i[term]
200 except KeyError:
201 t = "str(.+)"
202 result = result.replace(f"{{{term}}}", "{}".format(sub(r"^[^\(]+(\(.+\))$", r"\1", t)))
204 return f"{b}{result}"
206 def best_match(self, u: str, method: str = "get") -> tuple[APIConfig | None, str | None, dict[str, str] | None]:
207 """This method takes an URL of an API call and the HTTP method in input and finds the API operation URL,
208 the related configuration and the operation matching the requested method that best match with the API
209 call, if any. When the path matches but no operation declares the requested method, the first operation
210 for that path is returned so that the 405 check can reject it."""
211 cur_u = sub(r"\?.*$", "", u)
212 requested = method.lower()
213 for base_url in self.all_conf:
214 if u.startswith(base_url):
215 conf = self.all_conf[base_url]
216 for pat, items in conf["conf"].items():
217 if match(f"^{pat}$", cur_u):
218 for op_item in items:
219 if requested in op_item["method"].split():
220 return conf, pat, op_item
221 return conf, pat, items[0]
222 return None, None, None
224 @staticmethod
225 def _parse_format_map(op_conf: dict[str, str]) -> tuple[dict[str, str], dict[str, str]]:
226 op_format_map: dict[str, str] = {}
227 op_media_types: dict[str, str] = {}
228 if "format" not in op_conf:
229 return op_format_map, op_media_types
230 fm_val = op_conf["format"]
231 fm_list = fm_val if isinstance(fm_val, list) else [fm_val]
232 for fm in fm_list:
233 for raw_part in fm.split(";"):
234 part = raw_part.strip()
235 if not part:
236 continue
237 fields = [field.strip() for field in part.split(",")]
238 op_format_map[fields[0]] = fields[1]
239 if len(fields) >= FORMAT_PARTS_WITH_MEDIA_TYPE and fields[2]:
240 op_media_types[fields[0]] = fields[2]
241 return op_format_map, op_media_types
243 def _retry_config_for_operation(self, op_conf: dict[str, str]) -> tuple[int, float, float]:
244 retry_attempts = int(op_conf["retry_attempts"]) if "retry_attempts" in op_conf else self._retry_attempts
245 retry_wait = float(op_conf["retry_wait"]) if "retry_wait" in op_conf else self._retry_wait
246 retry_backoff = float(op_conf["retry_backoff"]) if "retry_backoff" in op_conf else self._retry_backoff
247 return retry_attempts, retry_wait, retry_backoff
249 def _resolve_custom_param_configs(
250 self, conf: APIConfig, custom_params_map: dict[str, dict[str, str]]
251 ) -> dict[str, FiltersConfig]:
252 result: dict[str, FiltersConfig] = {}
253 for name, param_conf in custom_params_map.items():
254 handler = param_conf["handler"]
255 if param_conf["phase"] != "preprocess" or not handler.endswith((".yaml", ".yml")):
256 continue
257 resolved = str((Path(conf["conf_file"]).parent / handler).resolve())
258 if resolved not in self._config_cache:
259 self._config_cache[resolved] = load_filters_config(resolved)
260 result[name] = self._config_cache[resolved]
261 return result
263 def get_op(self, op_complete_url: str, method: str = "get") -> Operation | tuple[int, str, str]:
264 """This method returns a new object of type Operation which represent the operation specified by
265 the input URL (parameter 'op_complete_url)' and the HTTP method. In case no operation can be found
266 according by checking the configuration files available in the APIManager, a tuple with an HTTP error
267 code and a message is returned instead."""
268 url_parsed = urlsplit(op_complete_url)
269 op_url = url_parsed.path
271 conf, op, op_conf = self.best_match(op_url, method)
272 if conf is not None and op is not None and op_conf is not None:
273 custom_params_map = parse_custom_params(op_conf["custom_params"]) if "custom_params" in op_conf else {}
275 api_disabled = conf["disable_params"]
276 op_disabled = parse_disable_params(op_conf["disable_params"]) if "disable_params" in op_conf else set()
277 effective_disabled = api_disabled | op_disabled
279 requires_auth = parse_auth(op_conf["auth"]) if "auth" in op_conf else conf["auth_required"]
281 op_format_map, op_format_media_types = APIManager._parse_format_map(op_conf)
282 retry_attempts, retry_wait, retry_backoff = self._retry_config_for_operation(op_conf)
283 config = OperationConfig(
284 sparql_endpoint=conf["tp"],
285 update_endpoint=conf["update_endpoint"],
286 sparql_http_method=conf["sparql_http_method"],
287 addon=conf["addon"],
288 format_map=op_format_map,
289 format_media_types=op_format_media_types,
290 sources_map=conf["sources_map"],
291 custom_params=custom_params_map,
292 disabled_params=effective_disabled,
293 requires_auth=requires_auth,
294 cache=self._cache,
295 default_cache_ttl=self._cache_ttl,
296 custom_param_configs=self._resolve_custom_param_configs(conf, custom_params_map),
297 public_base_url=conf["website"],
298 retry_attempts=retry_attempts,
299 retry_wait=retry_wait,
300 retry_backoff=retry_backoff,
301 )
302 return Operation(op_complete_url, op, op_conf, config)
304 for prefix, base_url, item in self._operation_prefixes:
305 if op_url.startswith(prefix):
306 template = item["url"]
307 full_template = base_url + template
308 param_value = op_url[len(prefix) :]
309 param_names = findall(PARAM_NAME, template)
310 if not param_value:
311 msg = (
312 f"HTTP status code 400: the operation '{full_template}' "
313 f"requires a value for parameter '{param_names[0]}'"
314 )
315 else:
316 msg = (
317 f"HTTP status code 400: the value '{param_value}' is not valid for parameter "
318 f"'{param_names[0]}' in operation '{full_template}'"
319 )
320 if "call" in item:
321 msg += f". Example: {base_url}{item['call']}"
322 return 400, msg, "text/plain"
324 return 404, "HTTP status code 404: the operation requested does not exist", "text/plain"