Coverage for ramose / openapi_documentation.py: 99%

247 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-15 15:58 +0000

1# SPDX-FileCopyrightText: 2018-2021 Silvio Peroni <silvio.peroni@unibo.it> 

2# SPDX-FileCopyrightText: 2020-2021 Marilena Daquino <marilena.daquino2@unibo.it> 

3# SPDX-FileCopyrightText: 2022 Davide Brembilla 

4# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it> 

5# SPDX-FileCopyrightText: 2025 Sergei Slinkin 

6# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

7# 

8# SPDX-License-Identifier: ISC 

9 

10import json 

11import re 

12from collections import OrderedDict 

13from pathlib import Path 

14from re import findall, split 

15from urllib.parse import quote 

16 

17import yaml 

18 

19from ramose._constants import FIELD_TYPE_RE, PARAM_NAME 

20from ramose.documentation import DocumentationHandler 

21from ramose.hash_format import parse_custom_params, parse_disable_params 

22 

23 

24class OpenAPIDocumentationHandler(DocumentationHandler): 

25 """ 

26 Export RAMOSE .hf configuration(s) to an OpenAPI 3.0 YAML specification. 

27 

28 Notes: 

29 - OpenAPI is a surface contract. RAMOSE implementation details (endpoint, addon, method, 

30 preprocess, postprocess) are intentionally omitted as they are not meaningful to API consumers. 

31 """ 

32 

33 # ------------------------- 

34 # Small utilities 

35 # ------------------------- 

36 def _normalize_base_url(self, base_url: str) -> str: 

37 return base_url.removeprefix("/") 

38 

39 def _get_conf(self, base_url: str | None = None): 

40 if base_url is None: 

41 first_key = next(iter(self.conf_doc)) 

42 return self.conf_doc[first_key] 

43 normalized = self._normalize_base_url(base_url) 

44 return self.conf_doc["/" + normalized] 

45 

46 def _schema_for_ramose_type(self, t): 

47 t = (t or "str").strip().lower() 

48 if t == "int": 

49 return {"type": "integer"} 

50 if t == "float": 

51 return {"type": "number"} 

52 if t == "datetime": 

53 return {"type": "string", "format": "date-time"} 

54 if t == "duration": 

55 # OpenAPI doesn't standardize duration; still useful as hint. 

56 return {"type": "string", "format": "duration"} 

57 return {"type": "string"} 

58 

59 def _parse_param_type_shape(self, s): 

60 # expected "type(regex)" 

61 try: 

62 t, shape = findall(r"^\s*([^\(]+)\((.+)\)\s*$", s)[0] 

63 return t.strip(), shape.strip() 

64 except (IndexError, ValueError): 

65 return "str", ".+" 

66 

67 def _guess_contact(self, contacts_value): 

68 """ 

69 Table 1: '#contacts <contact_url>' but in practice it's often an email. 

70 Prefer OpenAPI contact.email when it looks like an email. 

71 """ 

72 if not contacts_value: 

73 return None 

74 c = str(contacts_value).strip() 

75 if "@" in c and " " not in c and "/" not in c: 

76 return {"email": c} 

77 return {"name": c} 

78 

79 def _clean_text(self, v): 

80 """ 

81 Normalize text coming from .hf parsing so Swagger/ YAML render nicely: 

82 - remove wrapping quotes if they were included as part of the value 

83 - turn literal '\\n' into real newlines 

84 - trim whitespace 

85 """ 

86 if v is None: 

87 return None 

88 s = str(v).strip() 

89 # Strip wrapping quotes if parser stored them as part of the value 

90 if len(s) >= 2 and ((s[0] == s[-1] == '"') or (s[0] == s[-1] == "'")): 

91 s = s[1:-1].strip() 

92 # Convert literal backslash-n sequences to actual newlines 

93 return s.replace("\\n", "\n") 

94 

95 def _param_hint_from_preprocess(self, preprocess_str, param_name): 

96 """ 

97 Table 2: preprocess functions like 'lower(doi) --> split_dois(dois)'. 

98 Not formalizable in OpenAPI, but helpful as a hint. 

99 """ 

100 if not preprocess_str: 

101 return "" 

102 s = str(preprocess_str) 

103 # Any function call mentioning the param inside (...)? 

104 if re.search(r"\([^)]*\b" + re.escape(param_name) + r"\b[^)]*\)", s): 

105 return f"Note: input is pre-processed by RAMOSE: {s}" 

106 return "" 

107 

108 def _try_parse_output_json(self, output_json_value): 

109 """ 

110 Table 2: '#output_json <ex_response>' (JSON example). 

111 """ 

112 if not output_json_value: 

113 return None 

114 try: 

115 return json.loads(output_json_value) 

116 except (ValueError, TypeError): 

117 return None 

118 

119 # ------------------------- 

120 # Formats / media-types 

121 # ------------------------- 

122 def _collect_format_tokens(self, conf): 

123 # always supported by RAMOSE docs 

124 formats = {"csv", "json"} 

125 for op in conf["conf_json"][1:]: 

126 if "format" in op: 

127 fm_val = op["format"] 

128 fm_list = fm_val if isinstance(fm_val, list) else [fm_val] 

129 for fm in fm_list: 

130 for raw_part in str(fm).split(";"): 

131 part = raw_part.strip() 

132 if not part: 

133 continue 

134 # expected "fmt,func" 

135 fmt = part.split(",", 1)[0].strip() 

136 if fmt: 

137 formats.add(fmt) 

138 return sorted(formats) 

139 

140 def _media_type_for_format(self, fmt): 

141 fmt = (fmt or "").strip().lower() 

142 mapping = { 

143 "json": "application/json", 

144 "csv": "text/csv", 

145 "xml": "application/xml", 

146 "rdfxml": "application/rdf+xml", 

147 "rdf+xml": "application/rdf+xml", 

148 "ttl": "text/turtle", 

149 "turtle": "text/turtle", 

150 "nt": "application/n-triples", 

151 "ntriples": "application/n-triples", 

152 "n-triples": "application/n-triples", 

153 "nq": "application/n-quads", 

154 "n-quads": "application/n-quads", 

155 "trig": "application/trig", 

156 } 

157 return mapping.get(fmt) 

158 

159 def _build_response_content(self, ok_schema, formats_enum, ok_example=None, err_schema_ref=None): 

160 """ 

161 Build OpenAPI 'content' dict for responses based on supported formats. 

162 JSON gets structured schema. Others are represented as string payloads. 

163 If err_schema_ref is provided, also returns an error-content dict. 

164 """ 

165 content = OrderedDict() 

166 

167 content["application/json"] = {"schema": ok_schema} 

168 if ok_example is not None: 

169 content["application/json"]["examples"] = {"example": {"value": ok_example}} 

170 

171 content["text/csv"] = {"schema": {"type": "string"}} 

172 

173 # Other formats discovered in .hf (#format) 

174 for fmt in formats_enum or []: 

175 mt = self._media_type_for_format(fmt) 

176 if mt is None or mt in content: 

177 continue 

178 content[mt] = {"schema": {"type": "string"}} 

179 

180 if err_schema_ref: 

181 err_content = OrderedDict() 

182 err_content["application/json"] = {"schema": {"$ref": err_schema_ref}} 

183 err_content["text/csv"] = {"schema": {"type": "string"}} 

184 for fmt in formats_enum or []: 

185 mt = self._media_type_for_format(fmt) 

186 if mt is None or mt in err_content: 

187 continue 

188 err_content[mt] = {"schema": {"type": "string"}} 

189 return content, err_content 

190 

191 return content 

192 

193 # ------------------------- 

194 # Examples from #call 

195 # ------------------------- 

196 def _extract_param_examples_from_call(self, path_template, call_value): 

197 """ 

198 Given a template like '/metadata/{dois}' and a call like 

199 '/metadata/10.1/abc__10.2/xyz', return {'dois': '10.1/abc__10.2/xyz'}. 

200 

201 IMPORTANT: RAMOSE allows slashes inside the last param because it routes 

202 everything via <path:api_url>. OpenAPI tooling typically expects these 

203 slashes to be URL-encoded in examples. 

204 """ 

205 if not call_value: 

206 return {} 

207 

208 call_path = str(call_value).split("?", 1)[0].strip() 

209 

210 parts = path_template.split("/") 

211 re_parts = [] 

212 

213 # Allow '/' inside the LAST parameter segment (captures the rest of the path) 

214 last_index = len(parts) - 1 

215 

216 for i, part in enumerate(parts): 

217 if part.startswith("{") and part.endswith("}"): 

218 name = part[1:-1] 

219 if i == last_index: 

220 # last param: capture everything to end, including slashes 

221 re_parts.append(rf"(?P<{name}>.+)") 

222 else: 

223 # middle params: standard segment (no slash) 

224 re_parts.append(rf"(?P<{name}>[^/]+)") 

225 else: 

226 re_parts.append(re.escape(part)) 

227 

228 pat = "^" + "/".join(re_parts) + "$" 

229 m = re.match(pat, call_path) 

230 if not m: 

231 return {} 

232 return {k: v for k, v in m.groupdict().items() if v is not None} 

233 

234 # ------------------------- 

235 # Schema from field_type 

236 # ------------------------- 

237 def _build_row_schema_from_field_type(self, field_type_str): 

238 props = OrderedDict() 

239 for t, f in findall(FIELD_TYPE_RE, field_type_str or ""): 

240 props[f] = self._schema_for_ramose_type(t) 

241 return {"type": "object", "properties": props} 

242 

243 # ------------------------- 

244 # Main builder 

245 # ------------------------- 

246 def _build_info(self, api_meta): 

247 """Build the OpenAPI info object from API metadata.""" 

248 info: OrderedDict[str, object] = OrderedDict() 

249 info["title"] = api_meta.get("title", "RAMOSE API") 

250 info["version"] = api_meta.get("version", "0.0.0") 

251 if "description" in api_meta: 

252 info["description"] = api_meta["description"] 

253 if "license" in api_meta: 

254 info["license"] = {"name": api_meta["license"]} 

255 if "contacts" in api_meta: 

256 contact_obj = self._guess_contact(api_meta.get("contacts")) 

257 if contact_obj: 

258 info["contact"] = contact_obj 

259 return info 

260 

261 @staticmethod 

262 def _build_common_parameters(formats_enum): 

263 """Build the shared query parameter definitions.""" 

264 return { 

265 "require": { 

266 "name": "require", 

267 "in": "query", 

268 "description": "Remove rows that have an empty value in the specified field. Repeatable.", 

269 "required": False, 

270 "style": "form", 

271 "explode": True, 

272 "schema": {"type": "array", "items": {"type": "string"}}, 

273 }, 

274 "filter": { 

275 "name": "filter", 

276 "in": "query", 

277 "description": ( 

278 "Filter rows. Repeatable.\n\n" 

279 "Syntax: `field:opvalue` where `op` is one of `=`, `<`, `>`.\n" 

280 "If `op` is omitted, `value` is treated as a regex." 

281 ), 

282 "required": False, 

283 "style": "form", 

284 "explode": True, 

285 "schema": {"type": "array", "items": {"type": "string"}}, 

286 }, 

287 "sort": { 

288 "name": "sort", 

289 "in": "query", 

290 "description": "Sort rows. Syntax: asc(field) or desc(field). Repeatable.", 

291 "required": False, 

292 "style": "form", 

293 "explode": True, 

294 "schema": {"type": "array", "items": {"type": "string"}}, 

295 }, 

296 "format": { 

297 "name": "format", 

298 "in": "query", 

299 "description": "Force output format (overrides Accept header).", 

300 "required": False, 

301 "schema": {"type": "string", "enum": formats_enum}, 

302 }, 

303 "json": { 

304 "name": "json", 

305 "in": "query", 

306 "description": ( 

307 "Transform JSON output rows. Repeatable.\n\n" 

308 "Syntax:\n" 

309 '- `array("<sep>", field)`\n' 

310 '- `dict("<sep>", field, new_field_1, new_field_2, ...)`\n\n' 

311 "Where `<sep>` is a string separator (e.g. `,` or `__`)." 

312 ), 

313 "required": False, 

314 "style": "form", 

315 "explode": True, 

316 "schema": {"type": "array", "items": {"type": "string"}}, 

317 }, 

318 } 

319 

320 def _build_path_params(self, op, raw_path): 

321 """Build path parameter objects for an operation, including examples from #call.""" 

322 path_params = [] 

323 for p in findall(PARAM_NAME, raw_path): 

324 t, shape = ("str", ".+") 

325 if p in op: 

326 t, shape = self._parse_param_type_shape(op[p]) 

327 

328 schema = self._schema_for_ramose_type(t) 

329 if schema.get("type") == "string" and shape: 

330 schema["pattern"] = shape 

331 

332 param_obj = {"name": p, "in": "path", "required": True, "schema": schema} 

333 hint = self._param_hint_from_preprocess(op.get("preprocess"), p) 

334 if hint: 

335 param_obj["description"] = hint 

336 path_params.append(param_obj) 

337 

338 call_examples = self._extract_param_examples_from_call(raw_path, op.get("call")) 

339 for param in path_params: 

340 nm = param.get("name") 

341 if nm in call_examples: 

342 param["example"] = quote(call_examples[nm], safe="-._~__") 

343 if "__" in call_examples[nm] and "description" not in param: 

344 param["description"] = "Multiple values can be provided separated by '__'." 

345 

346 return path_params 

347 

348 def _build_operation_object(self, op, tag_name, path_params, common_param_refs, formats_enum, api_disabled=None): 

349 """Build an OpenAPI operation object for a single HTTP method.""" 

350 summary = op["description"].split("\n")[0].strip() if op.get("description") else "" 

351 desc = self._clean_text(op.get("description")) or "" 

352 

353 row_schema = self._build_row_schema_from_field_type(op.get("field_type", "")) 

354 ok_schema = {"type": "array", "items": row_schema} 

355 ok_example = self._try_parse_output_json(op.get("output_json")) 

356 ok_content, err_content = self._build_response_content( 

357 ok_schema=ok_schema, 

358 formats_enum=formats_enum, 

359 ok_example=ok_example, 

360 err_schema_ref="#/components/schemas/Error", 

361 ) 

362 

363 op_obj: OrderedDict[str, object] = OrderedDict() 

364 op_obj["tags"] = [tag_name] 

365 op_obj["summary"] = summary 

366 op_obj["description"] = desc 

367 custom_query_params = [] 

368 custom_names: set[str] = set() 

369 if "custom_params" in op: 

370 for name, conf in parse_custom_params(op["custom_params"]).items(): 

371 custom_names.add(name) 

372 param_obj: dict[str, object] = { 

373 "name": name, 

374 "in": "query", 

375 "required": False, 

376 "schema": {"type": "string"}, 

377 } 

378 if conf["description"]: 

379 param_obj["description"] = conf["description"] 

380 custom_query_params.append(param_obj) 

381 

382 disabled_names = set(api_disabled) if api_disabled else set() 

383 if "disable_params" in op: 

384 disabled_names |= parse_disable_params(op["disable_params"]) 

385 

386 suppressed = custom_names | disabled_names 

387 filtered_refs = [ref for ref in common_param_refs if ref["$ref"].rsplit("/", 1)[-1] not in suppressed] 

388 op_obj["parameters"] = path_params + custom_query_params + filtered_refs 

389 op_obj["responses"] = OrderedDict( 

390 [ 

391 ("200", {"description": "Successful response", "content": ok_content}), 

392 ("default", {"description": "Error", "content": err_content}), 

393 ] 

394 ) 

395 

396 return op_obj 

397 

398 def _build_openapi(self, base_url=None): 

399 conf = self._get_conf(base_url) 

400 api_meta = conf["conf_json"][0] 

401 formats_enum = self._collect_format_tokens(conf) 

402 

403 spec = OrderedDict() 

404 spec["openapi"] = "3.0.3" 

405 spec["info"] = self._build_info(api_meta) 

406 

407 base = api_meta.get("base", "") 

408 root = api_meta.get("url", "") 

409 spec["servers"] = [{"url": f"{base}{root}"}] 

410 

411 spec["components"] = { 

412 "schemas": { 

413 "Error": { 

414 "type": "object", 

415 "properties": {"error": {"type": "integer"}, "message": {"type": "string"}}, 

416 "required": ["error", "message"], 

417 }, 

418 }, 

419 "parameters": self._build_common_parameters(formats_enum), 

420 } 

421 

422 common_param_refs = [{"$ref": f"#/components/parameters/{name}"} for name in spec["components"]["parameters"]] 

423 

424 spec["paths"] = OrderedDict() 

425 tag_name = api_meta.get("title", "RAMOSE API") 

426 

427 api_disabled = parse_disable_params(api_meta["disable_params"]) if "disable_params" in api_meta else set() 

428 

429 for op in conf["conf_json"][1:]: 

430 raw_path = op.get("url", "") 

431 if raw_path not in spec["paths"]: 

432 spec["paths"][raw_path] = OrderedDict() 

433 

434 path_params = self._build_path_params(op, raw_path) 

435 

436 methods = [mm.lower() for mm in split(r"\s+", op.get("method", "get").strip()) if mm] 

437 for m in methods: 

438 spec["paths"][raw_path][m] = self._build_operation_object( 

439 op, 

440 tag_name, 

441 path_params, 

442 common_param_refs, 

443 formats_enum, 

444 api_disabled, 

445 ) 

446 

447 return spec 

448 

449 # ------------------------- 

450 # PyYAML compatibility 

451 # ------------------------- 

452 def _to_builtin(self, obj): 

453 """Recursively convert OrderedDict (and other non-builtin containers) 

454 to plain Python builtins so that yaml.safe_dump can serialize it.""" 

455 if isinstance(obj, OrderedDict): 

456 obj = dict(obj) 

457 if isinstance(obj, dict): 

458 return {k: self._to_builtin(v) for k, v in obj.items()} 

459 if isinstance(obj, (list, tuple, set)): 

460 return [self._to_builtin(v) for v in obj] 

461 return obj 

462 

463 def _dump_yaml(self, spec): 

464 """ 

465 Dump OpenAPI spec to YAML with nice formatting: 

466 - multiline strings become block scalars (|) 

467 - keys keep insertion order (sort_keys=False) 

468 """ 

469 

470 class _RamoseYamlDumper(yaml.SafeDumper): 

471 pass 

472 

473 def _str_presenter(dumper, data): 

474 if "\n" in data: 

475 return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") 

476 return dumper.represent_scalar("tag:yaml.org,2002:str", data) 

477 

478 _RamoseYamlDumper.add_representer(str, _str_presenter) 

479 return yaml.dump(spec, Dumper=_RamoseYamlDumper, sort_keys=False, allow_unicode=True) 

480 

481 def get_documentation(self, base_url=None): 

482 spec = self._build_openapi(base_url=base_url) 

483 spec = self._to_builtin(spec) 

484 yml = self._dump_yaml(spec) 

485 return 200, yml 

486 

487 def store_documentation(self, file_path, base_url=None): 

488 yml = self.get_documentation(base_url=base_url)[1] 

489 with Path(file_path).open("w", encoding="utf8") as f: 

490 f.write(yml) 

491 

492 def get_index(self, *_args, **_dargs): 

493 # Not used by the current UI. Keep a minimal placeholder. 

494 return "OpenAPI exporter available."