Coverage for ramose / openapi_documentation.py: 99%
247 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-15 15:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-15 15:58 +0000
1# SPDX-FileCopyrightText: 2018-2021 Silvio Peroni <silvio.peroni@unibo.it>
2# SPDX-FileCopyrightText: 2020-2021 Marilena Daquino <marilena.daquino2@unibo.it>
3# SPDX-FileCopyrightText: 2022 Davide Brembilla
4# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it>
5# SPDX-FileCopyrightText: 2025 Sergei Slinkin
6# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
7#
8# SPDX-License-Identifier: ISC
10import json
11import re
12from collections import OrderedDict
13from pathlib import Path
14from re import findall, split
15from urllib.parse import quote
17import yaml
19from ramose._constants import FIELD_TYPE_RE, PARAM_NAME
20from ramose.documentation import DocumentationHandler
21from ramose.hash_format import parse_custom_params, parse_disable_params
24class OpenAPIDocumentationHandler(DocumentationHandler):
25 """
26 Export RAMOSE .hf configuration(s) to an OpenAPI 3.0 YAML specification.
28 Notes:
29 - OpenAPI is a surface contract. RAMOSE implementation details (endpoint, addon, method,
30 preprocess, postprocess) are intentionally omitted as they are not meaningful to API consumers.
31 """
33 # -------------------------
34 # Small utilities
35 # -------------------------
36 def _normalize_base_url(self, base_url: str) -> str:
37 return base_url.removeprefix("/")
39 def _get_conf(self, base_url: str | None = None):
40 if base_url is None:
41 first_key = next(iter(self.conf_doc))
42 return self.conf_doc[first_key]
43 normalized = self._normalize_base_url(base_url)
44 return self.conf_doc["/" + normalized]
46 def _schema_for_ramose_type(self, t):
47 t = (t or "str").strip().lower()
48 if t == "int":
49 return {"type": "integer"}
50 if t == "float":
51 return {"type": "number"}
52 if t == "datetime":
53 return {"type": "string", "format": "date-time"}
54 if t == "duration":
55 # OpenAPI doesn't standardize duration; still useful as hint.
56 return {"type": "string", "format": "duration"}
57 return {"type": "string"}
59 def _parse_param_type_shape(self, s):
60 # expected "type(regex)"
61 try:
62 t, shape = findall(r"^\s*([^\(]+)\((.+)\)\s*$", s)[0]
63 return t.strip(), shape.strip()
64 except (IndexError, ValueError):
65 return "str", ".+"
67 def _guess_contact(self, contacts_value):
68 """
69 Table 1: '#contacts <contact_url>' but in practice it's often an email.
70 Prefer OpenAPI contact.email when it looks like an email.
71 """
72 if not contacts_value:
73 return None
74 c = str(contacts_value).strip()
75 if "@" in c and " " not in c and "/" not in c:
76 return {"email": c}
77 return {"name": c}
79 def _clean_text(self, v):
80 """
81 Normalize text coming from .hf parsing so Swagger/ YAML render nicely:
82 - remove wrapping quotes if they were included as part of the value
83 - turn literal '\\n' into real newlines
84 - trim whitespace
85 """
86 if v is None:
87 return None
88 s = str(v).strip()
89 # Strip wrapping quotes if parser stored them as part of the value
90 if len(s) >= 2 and ((s[0] == s[-1] == '"') or (s[0] == s[-1] == "'")):
91 s = s[1:-1].strip()
92 # Convert literal backslash-n sequences to actual newlines
93 return s.replace("\\n", "\n")
95 def _param_hint_from_preprocess(self, preprocess_str, param_name):
96 """
97 Table 2: preprocess functions like 'lower(doi) --> split_dois(dois)'.
98 Not formalizable in OpenAPI, but helpful as a hint.
99 """
100 if not preprocess_str:
101 return ""
102 s = str(preprocess_str)
103 # Any function call mentioning the param inside (...)?
104 if re.search(r"\([^)]*\b" + re.escape(param_name) + r"\b[^)]*\)", s):
105 return f"Note: input is pre-processed by RAMOSE: {s}"
106 return ""
108 def _try_parse_output_json(self, output_json_value):
109 """
110 Table 2: '#output_json <ex_response>' (JSON example).
111 """
112 if not output_json_value:
113 return None
114 try:
115 return json.loads(output_json_value)
116 except (ValueError, TypeError):
117 return None
119 # -------------------------
120 # Formats / media-types
121 # -------------------------
122 def _collect_format_tokens(self, conf):
123 # always supported by RAMOSE docs
124 formats = {"csv", "json"}
125 for op in conf["conf_json"][1:]:
126 if "format" in op:
127 fm_val = op["format"]
128 fm_list = fm_val if isinstance(fm_val, list) else [fm_val]
129 for fm in fm_list:
130 for raw_part in str(fm).split(";"):
131 part = raw_part.strip()
132 if not part:
133 continue
134 # expected "fmt,func"
135 fmt = part.split(",", 1)[0].strip()
136 if fmt:
137 formats.add(fmt)
138 return sorted(formats)
140 def _media_type_for_format(self, fmt):
141 fmt = (fmt or "").strip().lower()
142 mapping = {
143 "json": "application/json",
144 "csv": "text/csv",
145 "xml": "application/xml",
146 "rdfxml": "application/rdf+xml",
147 "rdf+xml": "application/rdf+xml",
148 "ttl": "text/turtle",
149 "turtle": "text/turtle",
150 "nt": "application/n-triples",
151 "ntriples": "application/n-triples",
152 "n-triples": "application/n-triples",
153 "nq": "application/n-quads",
154 "n-quads": "application/n-quads",
155 "trig": "application/trig",
156 }
157 return mapping.get(fmt)
159 def _build_response_content(self, ok_schema, formats_enum, ok_example=None, err_schema_ref=None):
160 """
161 Build OpenAPI 'content' dict for responses based on supported formats.
162 JSON gets structured schema. Others are represented as string payloads.
163 If err_schema_ref is provided, also returns an error-content dict.
164 """
165 content = OrderedDict()
167 content["application/json"] = {"schema": ok_schema}
168 if ok_example is not None:
169 content["application/json"]["examples"] = {"example": {"value": ok_example}}
171 content["text/csv"] = {"schema": {"type": "string"}}
173 # Other formats discovered in .hf (#format)
174 for fmt in formats_enum or []:
175 mt = self._media_type_for_format(fmt)
176 if mt is None or mt in content:
177 continue
178 content[mt] = {"schema": {"type": "string"}}
180 if err_schema_ref:
181 err_content = OrderedDict()
182 err_content["application/json"] = {"schema": {"$ref": err_schema_ref}}
183 err_content["text/csv"] = {"schema": {"type": "string"}}
184 for fmt in formats_enum or []:
185 mt = self._media_type_for_format(fmt)
186 if mt is None or mt in err_content:
187 continue
188 err_content[mt] = {"schema": {"type": "string"}}
189 return content, err_content
191 return content
193 # -------------------------
194 # Examples from #call
195 # -------------------------
196 def _extract_param_examples_from_call(self, path_template, call_value):
197 """
198 Given a template like '/metadata/{dois}' and a call like
199 '/metadata/10.1/abc__10.2/xyz', return {'dois': '10.1/abc__10.2/xyz'}.
201 IMPORTANT: RAMOSE allows slashes inside the last param because it routes
202 everything via <path:api_url>. OpenAPI tooling typically expects these
203 slashes to be URL-encoded in examples.
204 """
205 if not call_value:
206 return {}
208 call_path = str(call_value).split("?", 1)[0].strip()
210 parts = path_template.split("/")
211 re_parts = []
213 # Allow '/' inside the LAST parameter segment (captures the rest of the path)
214 last_index = len(parts) - 1
216 for i, part in enumerate(parts):
217 if part.startswith("{") and part.endswith("}"):
218 name = part[1:-1]
219 if i == last_index:
220 # last param: capture everything to end, including slashes
221 re_parts.append(rf"(?P<{name}>.+)")
222 else:
223 # middle params: standard segment (no slash)
224 re_parts.append(rf"(?P<{name}>[^/]+)")
225 else:
226 re_parts.append(re.escape(part))
228 pat = "^" + "/".join(re_parts) + "$"
229 m = re.match(pat, call_path)
230 if not m:
231 return {}
232 return {k: v for k, v in m.groupdict().items() if v is not None}
234 # -------------------------
235 # Schema from field_type
236 # -------------------------
237 def _build_row_schema_from_field_type(self, field_type_str):
238 props = OrderedDict()
239 for t, f in findall(FIELD_TYPE_RE, field_type_str or ""):
240 props[f] = self._schema_for_ramose_type(t)
241 return {"type": "object", "properties": props}
243 # -------------------------
244 # Main builder
245 # -------------------------
246 def _build_info(self, api_meta):
247 """Build the OpenAPI info object from API metadata."""
248 info: OrderedDict[str, object] = OrderedDict()
249 info["title"] = api_meta.get("title", "RAMOSE API")
250 info["version"] = api_meta.get("version", "0.0.0")
251 if "description" in api_meta:
252 info["description"] = api_meta["description"]
253 if "license" in api_meta:
254 info["license"] = {"name": api_meta["license"]}
255 if "contacts" in api_meta:
256 contact_obj = self._guess_contact(api_meta.get("contacts"))
257 if contact_obj:
258 info["contact"] = contact_obj
259 return info
261 @staticmethod
262 def _build_common_parameters(formats_enum):
263 """Build the shared query parameter definitions."""
264 return {
265 "require": {
266 "name": "require",
267 "in": "query",
268 "description": "Remove rows that have an empty value in the specified field. Repeatable.",
269 "required": False,
270 "style": "form",
271 "explode": True,
272 "schema": {"type": "array", "items": {"type": "string"}},
273 },
274 "filter": {
275 "name": "filter",
276 "in": "query",
277 "description": (
278 "Filter rows. Repeatable.\n\n"
279 "Syntax: `field:opvalue` where `op` is one of `=`, `<`, `>`.\n"
280 "If `op` is omitted, `value` is treated as a regex."
281 ),
282 "required": False,
283 "style": "form",
284 "explode": True,
285 "schema": {"type": "array", "items": {"type": "string"}},
286 },
287 "sort": {
288 "name": "sort",
289 "in": "query",
290 "description": "Sort rows. Syntax: asc(field) or desc(field). Repeatable.",
291 "required": False,
292 "style": "form",
293 "explode": True,
294 "schema": {"type": "array", "items": {"type": "string"}},
295 },
296 "format": {
297 "name": "format",
298 "in": "query",
299 "description": "Force output format (overrides Accept header).",
300 "required": False,
301 "schema": {"type": "string", "enum": formats_enum},
302 },
303 "json": {
304 "name": "json",
305 "in": "query",
306 "description": (
307 "Transform JSON output rows. Repeatable.\n\n"
308 "Syntax:\n"
309 '- `array("<sep>", field)`\n'
310 '- `dict("<sep>", field, new_field_1, new_field_2, ...)`\n\n'
311 "Where `<sep>` is a string separator (e.g. `,` or `__`)."
312 ),
313 "required": False,
314 "style": "form",
315 "explode": True,
316 "schema": {"type": "array", "items": {"type": "string"}},
317 },
318 }
320 def _build_path_params(self, op, raw_path):
321 """Build path parameter objects for an operation, including examples from #call."""
322 path_params = []
323 for p in findall(PARAM_NAME, raw_path):
324 t, shape = ("str", ".+")
325 if p in op:
326 t, shape = self._parse_param_type_shape(op[p])
328 schema = self._schema_for_ramose_type(t)
329 if schema.get("type") == "string" and shape:
330 schema["pattern"] = shape
332 param_obj = {"name": p, "in": "path", "required": True, "schema": schema}
333 hint = self._param_hint_from_preprocess(op.get("preprocess"), p)
334 if hint:
335 param_obj["description"] = hint
336 path_params.append(param_obj)
338 call_examples = self._extract_param_examples_from_call(raw_path, op.get("call"))
339 for param in path_params:
340 nm = param.get("name")
341 if nm in call_examples:
342 param["example"] = quote(call_examples[nm], safe="-._~__")
343 if "__" in call_examples[nm] and "description" not in param:
344 param["description"] = "Multiple values can be provided separated by '__'."
346 return path_params
348 def _build_operation_object(self, op, tag_name, path_params, common_param_refs, formats_enum, api_disabled=None):
349 """Build an OpenAPI operation object for a single HTTP method."""
350 summary = op["description"].split("\n")[0].strip() if op.get("description") else ""
351 desc = self._clean_text(op.get("description")) or ""
353 row_schema = self._build_row_schema_from_field_type(op.get("field_type", ""))
354 ok_schema = {"type": "array", "items": row_schema}
355 ok_example = self._try_parse_output_json(op.get("output_json"))
356 ok_content, err_content = self._build_response_content(
357 ok_schema=ok_schema,
358 formats_enum=formats_enum,
359 ok_example=ok_example,
360 err_schema_ref="#/components/schemas/Error",
361 )
363 op_obj: OrderedDict[str, object] = OrderedDict()
364 op_obj["tags"] = [tag_name]
365 op_obj["summary"] = summary
366 op_obj["description"] = desc
367 custom_query_params = []
368 custom_names: set[str] = set()
369 if "custom_params" in op:
370 for name, conf in parse_custom_params(op["custom_params"]).items():
371 custom_names.add(name)
372 param_obj: dict[str, object] = {
373 "name": name,
374 "in": "query",
375 "required": False,
376 "schema": {"type": "string"},
377 }
378 if conf["description"]:
379 param_obj["description"] = conf["description"]
380 custom_query_params.append(param_obj)
382 disabled_names = set(api_disabled) if api_disabled else set()
383 if "disable_params" in op:
384 disabled_names |= parse_disable_params(op["disable_params"])
386 suppressed = custom_names | disabled_names
387 filtered_refs = [ref for ref in common_param_refs if ref["$ref"].rsplit("/", 1)[-1] not in suppressed]
388 op_obj["parameters"] = path_params + custom_query_params + filtered_refs
389 op_obj["responses"] = OrderedDict(
390 [
391 ("200", {"description": "Successful response", "content": ok_content}),
392 ("default", {"description": "Error", "content": err_content}),
393 ]
394 )
396 return op_obj
398 def _build_openapi(self, base_url=None):
399 conf = self._get_conf(base_url)
400 api_meta = conf["conf_json"][0]
401 formats_enum = self._collect_format_tokens(conf)
403 spec = OrderedDict()
404 spec["openapi"] = "3.0.3"
405 spec["info"] = self._build_info(api_meta)
407 base = api_meta.get("base", "")
408 root = api_meta.get("url", "")
409 spec["servers"] = [{"url": f"{base}{root}"}]
411 spec["components"] = {
412 "schemas": {
413 "Error": {
414 "type": "object",
415 "properties": {"error": {"type": "integer"}, "message": {"type": "string"}},
416 "required": ["error", "message"],
417 },
418 },
419 "parameters": self._build_common_parameters(formats_enum),
420 }
422 common_param_refs = [{"$ref": f"#/components/parameters/{name}"} for name in spec["components"]["parameters"]]
424 spec["paths"] = OrderedDict()
425 tag_name = api_meta.get("title", "RAMOSE API")
427 api_disabled = parse_disable_params(api_meta["disable_params"]) if "disable_params" in api_meta else set()
429 for op in conf["conf_json"][1:]:
430 raw_path = op.get("url", "")
431 if raw_path not in spec["paths"]:
432 spec["paths"][raw_path] = OrderedDict()
434 path_params = self._build_path_params(op, raw_path)
436 methods = [mm.lower() for mm in split(r"\s+", op.get("method", "get").strip()) if mm]
437 for m in methods:
438 spec["paths"][raw_path][m] = self._build_operation_object(
439 op,
440 tag_name,
441 path_params,
442 common_param_refs,
443 formats_enum,
444 api_disabled,
445 )
447 return spec
449 # -------------------------
450 # PyYAML compatibility
451 # -------------------------
452 def _to_builtin(self, obj):
453 """Recursively convert OrderedDict (and other non-builtin containers)
454 to plain Python builtins so that yaml.safe_dump can serialize it."""
455 if isinstance(obj, OrderedDict):
456 obj = dict(obj)
457 if isinstance(obj, dict):
458 return {k: self._to_builtin(v) for k, v in obj.items()}
459 if isinstance(obj, (list, tuple, set)):
460 return [self._to_builtin(v) for v in obj]
461 return obj
463 def _dump_yaml(self, spec):
464 """
465 Dump OpenAPI spec to YAML with nice formatting:
466 - multiline strings become block scalars (|)
467 - keys keep insertion order (sort_keys=False)
468 """
470 class _RamoseYamlDumper(yaml.SafeDumper):
471 pass
473 def _str_presenter(dumper, data):
474 if "\n" in data:
475 return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
476 return dumper.represent_scalar("tag:yaml.org,2002:str", data)
478 _RamoseYamlDumper.add_representer(str, _str_presenter)
479 return yaml.dump(spec, Dumper=_RamoseYamlDumper, sort_keys=False, allow_unicode=True)
481 def get_documentation(self, base_url=None):
482 spec = self._build_openapi(base_url=base_url)
483 spec = self._to_builtin(spec)
484 yml = self._dump_yaml(spec)
485 return 200, yml
487 def store_documentation(self, file_path, base_url=None):
488 yml = self.get_documentation(base_url=base_url)[1]
489 with Path(file_path).open("w", encoding="utf8") as f:
490 f.write(yml)
492 def get_index(self, *_args, **_dargs):
493 # Not used by the current UI. Keep a minimal placeholder.
494 return "OpenAPI exporter available."