Coverage for ramose / openapi_documentation.py: 99%

404 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-01 13:49 +0000

1# SPDX-FileCopyrightText: 2018-2021 Silvio Peroni <silvio.peroni@unibo.it> 

2# SPDX-FileCopyrightText: 2020-2021 Marilena Daquino <marilena.daquino2@unibo.it> 

3# SPDX-FileCopyrightText: 2022 Davide Brembilla 

4# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it> 

5# SPDX-FileCopyrightText: 2025 Sergei Slinkin 

6# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

7# 

8# SPDX-License-Identifier: ISC 

9 

10from __future__ import annotations 

11 

12import csv 

13import importlib.resources 

14import json 

15import re 

16from collections import OrderedDict 

17from dataclasses import dataclass 

18from functools import cache 

19from html.parser import HTMLParser 

20from io import StringIO 

21from pathlib import Path 

22from re import findall, split 

23from typing import TYPE_CHECKING, overload 

24 

25import yaml 

26from markdown import markdown 

27 

28from ramose._constants import FIELD_TYPE_RE, FORMAT_PARTS_WITH_MEDIA_TYPE, PARAM_NAME, media_type_for_format 

29from ramose.documentation import DocumentationHandler 

30from ramose.hash_format import parse_auth, parse_custom_params, parse_disable_params 

31 

32if TYPE_CHECKING: 

33 from ramose.api_manager import APIConfig 

34 

35_MIN_QUOTED_LENGTH = 2 

36 

37# swagger-ui renders inline Markdown `code` with 5px vertical padding, so code spans on consecutive 

38# lines overlap (https://github.com/swagger-api/swagger-ui/issues/7569) 

39SWAGGER_MARKDOWN_CSS_FIX = ( 

40 "\n.swagger-ui .renderedMarkdown code,\n.swagger-ui .markdown code{padding:0 7px!important}\n" 

41 ".swagger-ui .renderedMarkdown li,\n.swagger-ui .markdown li{margin:6px 0!important}\n" 

42) 

43 

44 

45@cache 

46def _read_swagger_asset(filename: str) -> str: 

47 return importlib.resources.files("flask_swagger_ui").joinpath(f"dist/{filename}").read_text(encoding="utf-8") 

48 

49 

50@dataclass 

51class _OpenAPIBuildContext: 

52 tag_name: str 

53 common_param_refs: list[dict[str, str]] 

54 formats_enum: list[str] 

55 api_disabled: set[str] 

56 api_auth: bool 

57 

58 

59class _MarkupParser(HTMLParser): 

60 def __init__(self) -> None: 

61 super().__init__() 

62 self._text_fragments: list[str] = [] 

63 self.links: list[tuple[str, str]] = [] 

64 self._current_link_href: str | None = None 

65 self._current_link_label_fragments: list[str] = [] 

66 

67 def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: 

68 if tag == "a": 

69 self._current_link_href = dict(attrs).get("href") 

70 self._current_link_label_fragments = [] 

71 

72 def handle_endtag(self, tag: str) -> None: 

73 if tag == "a" and self._current_link_href is not None: 

74 label = "".join(self._current_link_label_fragments).strip() 

75 self.links.append((self._current_link_href, label)) 

76 self._current_link_href = None 

77 

78 def handle_data(self, data: str) -> None: 

79 self._text_fragments.append(data) 

80 if self._current_link_href is not None: 

81 self._current_link_label_fragments.append(data) 

82 

83 @property 

84 def text(self) -> str: 

85 return "".join(self._text_fragments).strip() 

86 

87 

88class OpenAPIDocumentationHandler(DocumentationHandler): 

89 def _normalize_base_url(self, base_url: str) -> str: 

90 return base_url.removeprefix("/") 

91 

92 def _get_conf(self, base_url: str | None = None) -> APIConfig: 

93 if base_url is None: 

94 first_key = next(iter(self.conf_doc)) 

95 return self.conf_doc[first_key] 

96 normalized = self._normalize_base_url(base_url) 

97 return self.conf_doc["/" + normalized] 

98 

99 def _schema_for_ramose_type(self, t: str | None) -> dict[str, str]: 

100 t = (t or "str").strip().lower() 

101 if t == "int": 

102 return {"type": "integer"} 

103 if t == "float": 

104 return {"type": "number"} 

105 if t == "datetime": 

106 return {"type": "string", "format": "date-time"} 

107 if t == "duration": 

108 return {"type": "string", "format": "duration"} 

109 return {"type": "string"} 

110 

111 def _parse_param_type_shape(self, s: str) -> tuple[str, str]: 

112 try: 

113 t, shape = findall(r"^\s*([^\(]+)\((.+)\)\s*$", s)[0] 

114 return t.strip(), shape.strip() 

115 except (IndexError, ValueError): 

116 return "str", ".+" 

117 

118 def _parse_markup(self, text: str) -> _MarkupParser: 

119 parser = _MarkupParser() 

120 parser.feed(markdown(text)) 

121 return parser 

122 

123 def _guess_contact(self, contacts_value: object) -> dict[str, str] | None: 

124 if not contacts_value: 

125 return None 

126 parsed = self._parse_markup(str(contacts_value).strip()) 

127 for href, label in parsed.links: 

128 if href.startswith("mailto:"): 

129 return {"email": href.removeprefix("mailto:")} 

130 if href.startswith(("http://", "https://")): 

131 return {"name": label or parsed.text, "url": href} 

132 plain_text = parsed.text 

133 looks_like_bare_email = "@" in plain_text and " " not in plain_text and "/" not in plain_text 

134 if looks_like_bare_email: 

135 return {"email": plain_text} 

136 return {"name": plain_text} 

137 

138 def _clean_text(self, v: object) -> str | None: 

139 if v is None: 

140 return None 

141 s = str(v).strip() 

142 if len(s) >= _MIN_QUOTED_LENGTH and ((s[0] == s[-1] == '"') or (s[0] == s[-1] == "'")): 

143 s = s[1:-1].strip() 

144 return s.replace("\\n", "\n") 

145 

146 def _param_hint_from_preprocess(self, preprocess_str: object, param_name: str) -> str: 

147 if not preprocess_str: 

148 return "" 

149 s = str(preprocess_str) 

150 if re.search(r"\([^)]*\b" + re.escape(param_name) + r"\b[^)]*\)", s): 

151 return f"Note: input is pre-processed by RAMOSE: {s}" 

152 return "" 

153 

154 def _try_parse_output_json(self, output_json_value: str | None) -> object: 

155 if not output_json_value: 

156 return None 

157 try: 

158 return json.loads(output_json_value) 

159 except (ValueError, TypeError): 

160 return None 

161 

162 def _collect_format_tokens(self, conf: APIConfig) -> list[str]: 

163 formats = {"csv", "json"} 

164 for op in conf["conf_json"][1:]: 

165 if "format" in op: 

166 fm_val = op["format"] 

167 fm_list = fm_val if isinstance(fm_val, list) else [fm_val] 

168 for fm in fm_list: 

169 for raw_part in str(fm).split(";"): 

170 part = raw_part.strip() 

171 if not part: 

172 continue 

173 fmt = part.split(",", 1)[0].strip() 

174 if fmt: 

175 formats.add(fmt) 

176 return sorted(formats) 

177 

178 def _format_media_type_map(self, op: dict[str, str]) -> dict[str, str]: 

179 if "format" not in op: 

180 return {} 

181 raw_value = op["format"] 

182 declarations = raw_value if isinstance(raw_value, list) else [raw_value] 

183 declared_media_types: dict[str, str] = {} 

184 for declaration in declarations: 

185 for part in str(declaration).split(";"): 

186 fields = [field.strip() for field in part.split(",")] 

187 if len(fields) >= FORMAT_PARTS_WITH_MEDIA_TYPE and fields[0] and fields[2]: 

188 declared_media_types[fields[0]] = fields[2] 

189 return declared_media_types 

190 

191 def _single_response_media_type(self, op: dict[str, str]) -> str: 

192 default_format = op["default_format"].strip() if "default_format" in op else "json" 

193 if default_format == "json": 

194 return "application/json" 

195 if default_format == "csv": 

196 return "text/csv" 

197 declared_media_types = self._format_media_type_map(op) 

198 if default_format in declared_media_types: 

199 return declared_media_types[default_format] 

200 return media_type_for_format(default_format) or "application/json" 

201 

202 def _csv_example(self, example: object) -> str | None: 

203 if not isinstance(example, list): 

204 return None 

205 dict_rows = [row for row in example if isinstance(row, dict)] 

206 if not dict_rows: 

207 return None 

208 scalar_cell_types = (str, int, float, bool, type(None)) 

209 all_cells_are_scalar = all(isinstance(value, scalar_cell_types) for row in dict_rows for value in row.values()) 

210 if not all_cells_are_scalar: 

211 return None 

212 fieldnames = list(dict.fromkeys(key for row in dict_rows for key in row)) 

213 buffer = StringIO() 

214 writer = csv.DictWriter(buffer, fieldnames=fieldnames) 

215 writer.writeheader() 

216 writer.writerows(dict_rows) 

217 return buffer.getvalue() 

218 

219 def _content_entry(self, media_type: str, ok_schema: dict[str, object], ok_example: object) -> dict[str, object]: 

220 is_json = media_type == "application/json" or media_type.endswith("+json") 

221 entry: dict[str, object] = {"schema": ok_schema if is_json else {"type": "string"}} 

222 if is_json: 

223 example = ok_example 

224 elif media_type == "text/csv": 

225 example = self._csv_example(ok_example) 

226 else: 

227 example = None 

228 if example is not None: 

229 entry["examples"] = {"example": {"value": example}} 

230 return entry 

231 

232 @overload 

233 def _build_response_content( 

234 self, 

235 ok_schema: dict[str, object], 

236 declared_media_types: dict[str, str], 

237 ok_example: object = ..., 

238 err_schema_ref: None = ..., 

239 ) -> OrderedDict[str, dict[str, object]]: ... 

240 

241 @overload 

242 def _build_response_content( 

243 self, 

244 ok_schema: dict[str, object], 

245 declared_media_types: dict[str, str], 

246 ok_example: object = ..., 

247 *, 

248 err_schema_ref: str, 

249 ) -> tuple[OrderedDict[str, dict[str, object]], OrderedDict[str, dict[str, object]]]: ... 

250 

251 def _build_response_content( 

252 self, 

253 ok_schema: dict[str, object], 

254 declared_media_types: dict[str, str], 

255 ok_example: object = None, 

256 err_schema_ref: str | None = None, 

257 ) -> ( 

258 OrderedDict[str, dict[str, object]] 

259 | tuple[OrderedDict[str, dict[str, object]], OrderedDict[str, dict[str, object]]] 

260 ): 

261 media_types = dict.fromkeys(["application/json", "text/csv", *declared_media_types.values()]) 

262 content: OrderedDict[str, dict[str, object]] = OrderedDict( 

263 (media_type, self._content_entry(media_type, ok_schema, ok_example)) for media_type in media_types 

264 ) 

265 

266 if err_schema_ref: 

267 err_content: OrderedDict[str, dict[str, object]] = OrderedDict() 

268 err_content["application/json"] = {"schema": {"$ref": err_schema_ref}} 

269 err_content["text/csv"] = {"schema": {"type": "string"}} 

270 return content, err_content 

271 

272 return content 

273 

274 def _build_single_format_response( 

275 self, 

276 op: dict[str, str], 

277 ok_schema: dict[str, object], 

278 ok_example: object, 

279 ) -> tuple[OrderedDict[str, dict[str, object]], OrderedDict[str, dict[str, object]]]: 

280 media_type = self._single_response_media_type(op) 

281 ok_content: OrderedDict[str, dict[str, object]] = OrderedDict( 

282 [(media_type, self._content_entry(media_type, ok_schema, ok_example))], 

283 ) 

284 err_content: OrderedDict[str, dict[str, object]] = OrderedDict( 

285 [("application/json", {"schema": {"$ref": "#/components/schemas/Error"}})], 

286 ) 

287 return ok_content, err_content 

288 

289 def _extract_param_examples_from_call(self, path_template: str, call_value: object) -> dict[str, str]: 

290 if not call_value: 

291 return {} 

292 

293 call_path = str(call_value).split("?", 1)[0].strip() 

294 

295 parts = path_template.split("/") 

296 re_parts = [] 

297 

298 last_index = len(parts) - 1 

299 

300 for i, part in enumerate(parts): 

301 if part.startswith("{") and part.endswith("}"): 

302 name = part[1:-1] 

303 # Last param captures slashes too (RAMOSE routes via <path:api_url>) 

304 if i == last_index: 

305 re_parts.append(rf"(?P<{name}>.+)") 

306 else: 

307 re_parts.append(rf"(?P<{name}>[^/]+)") 

308 else: 

309 re_parts.append(re.escape(part)) 

310 

311 pat = "^" + "/".join(re_parts) + "$" 

312 m = re.match(pat, call_path) 

313 if not m: 

314 return {} 

315 return {k: v for k, v in m.groupdict().items() if v is not None} 

316 

317 def _build_row_schema_from_field_type(self, field_type_str: str) -> dict[str, object]: 

318 props = OrderedDict() 

319 for t, f in findall(FIELD_TYPE_RE, field_type_str or ""): 

320 props[f] = self._schema_for_ramose_type(t) 

321 return {"type": "object", "properties": props} 

322 

323 def _infer_schema_from_value(self, value: object) -> dict[str, object]: 

324 primitive_type_map: dict[type, str] = {bool: "boolean", int: "integer", float: "number", str: "string"} 

325 for py_type, json_type in primitive_type_map.items(): 

326 if isinstance(value, py_type): 

327 return {"type": json_type} 

328 if isinstance(value, list): 

329 result: dict[str, object] = {"type": "array"} 

330 if value: 

331 schemas = [self._infer_schema_from_value(item) for item in value] 

332 if all(schema == schemas[0] for schema in schemas): 

333 result["items"] = schemas[0] 

334 return result 

335 if isinstance(value, dict): 

336 if value: 

337 return {"type": "object", "properties": {k: self._infer_schema_from_value(v) for k, v in value.items()}} 

338 return {"type": "object"} 

339 return {} 

340 

341 def _build_info(self, api_meta: dict[str, str]) -> OrderedDict[str, object]: 

342 info: OrderedDict[str, object] = OrderedDict() 

343 info["title"] = self._parse_markup(api_meta.get("title", "RAMOSE API")).text 

344 version_text = self._parse_markup(api_meta.get("version", "0.0.0")).text 

345 info["version"] = re.sub(r"^version\s+", "", version_text, flags=re.IGNORECASE) 

346 if "description" in api_meta: 

347 info["description"] = api_meta["description"] 

348 if "license" in api_meta: 

349 parsed_license = self._parse_markup(api_meta["license"]) 

350 license_url = next( 

351 (href for href, _ in parsed_license.links if href.startswith(("http://", "https://"))), 

352 None, 

353 ) 

354 license_obj: dict[str, str] = {"name": parsed_license.text} 

355 if license_url is not None: 

356 license_obj["url"] = license_url 

357 info["license"] = license_obj 

358 if "contacts" in api_meta: 

359 contact_obj = self._guess_contact(api_meta["contacts"]) 

360 if contact_obj: 

361 info["contact"] = contact_obj 

362 return info 

363 

364 @staticmethod 

365 def _build_common_parameters(formats_enum: list[str]) -> dict[str, dict[str, object]]: 

366 return { 

367 "require": { 

368 "name": "require", 

369 "in": "query", 

370 "description": "Remove rows that have an empty value in the specified field. Repeatable.", 

371 "required": False, 

372 "style": "form", 

373 "explode": True, 

374 "schema": {"type": "array", "items": {"type": "string"}}, 

375 }, 

376 "filter": { 

377 "name": "filter", 

378 "in": "query", 

379 "description": ( 

380 "Filter rows. Repeatable.\n\n" 

381 "Syntax: `field:opvalue` where `op` is one of `=`, `<`, `>`.\n" 

382 "If `op` is omitted, `value` is treated as a regex." 

383 ), 

384 "required": False, 

385 "style": "form", 

386 "explode": True, 

387 "schema": {"type": "array", "items": {"type": "string"}}, 

388 }, 

389 "sort": { 

390 "name": "sort", 

391 "in": "query", 

392 "description": "Sort rows. Syntax: asc(field) or desc(field). Repeatable.", 

393 "required": False, 

394 "style": "form", 

395 "explode": True, 

396 "schema": {"type": "array", "items": {"type": "string"}}, 

397 }, 

398 "format": { 

399 "name": "format", 

400 "in": "query", 

401 "description": "Force output format (overrides Accept header).", 

402 "required": False, 

403 "schema": {"type": "string", "enum": formats_enum}, 

404 }, 

405 "json": { 

406 "name": "json", 

407 "in": "query", 

408 "description": ( 

409 "Transform JSON output rows. Repeatable.\n\n" 

410 "Syntax:\n" 

411 '- `array("<sep>", field)`\n' 

412 '- `dict("<sep>", field, new_field_1, new_field_2, ...)`\n\n' 

413 "Where `<sep>` is a string separator (e.g. `,` or `__`)." 

414 ), 

415 "required": False, 

416 "style": "form", 

417 "explode": True, 

418 "schema": {"type": "array", "items": {"type": "string"}}, 

419 }, 

420 } 

421 

422 def _build_path_params(self, op: dict[str, str], raw_path: str) -> list[dict[str, object]]: 

423 path_params = [] 

424 for p in findall(PARAM_NAME, raw_path): 

425 t, shape = ("str", ".+") 

426 if p in op: 

427 t, shape = self._parse_param_type_shape(op[p]) 

428 

429 schema = self._schema_for_ramose_type(t) 

430 if schema.get("type") == "string" and shape: 

431 schema["pattern"] = shape 

432 

433 param_obj = {"name": p, "in": "path", "required": True, "schema": schema} 

434 hint = self._param_hint_from_preprocess(op.get("preprocess"), p) 

435 if hint: 

436 param_obj["description"] = hint 

437 path_params.append(param_obj) 

438 

439 call_examples = self._extract_param_examples_from_call(raw_path, op.get("call")) 

440 for param in path_params: 

441 nm = param.get("name") 

442 if nm in call_examples: 

443 param["example"] = call_examples[nm] 

444 if "__" in call_examples[nm] and "description" not in param: 

445 param["description"] = "Multiple values can be provided separated by '__'." 

446 

447 return path_params 

448 

449 @staticmethod 

450 def _build_request_body(op: dict[str, str]) -> dict[str, object] | None: 

451 path_names = set(findall(PARAM_NAME, op.get("url", ""))) 

452 body_names = [name for name in findall(r"\[\[(\w+)\]\]", op.get("sparql", "")) if name not in path_names] 

453 if not body_names: 

454 return None 

455 properties = {name: {"type": "string"} for name in dict.fromkeys(body_names)} 

456 return { 

457 "required": True, 

458 "content": {"application/json": {"schema": {"type": "object", "properties": properties}}}, 

459 } 

460 

461 def _build_operation_object( 

462 self, 

463 op: dict[str, str], 

464 path_params: list[dict[str, object]], 

465 ctx: _OpenAPIBuildContext, 

466 method: str = "get", 

467 ) -> OrderedDict[str, object]: 

468 summary = self._parse_markup(op["description"].split("\n")[0]).text if op.get("description") else "" 

469 desc = self._clean_text(op.get("description")) or "" 

470 

471 row_schema = self._build_row_schema_from_field_type(op.get("field_type", "")) 

472 ok_example = self._try_parse_output_json(op.get("output_json")) 

473 if not row_schema["properties"] and ok_example is not None: 

474 ok_schema: dict[str, object] = self._infer_schema_from_value(ok_example) 

475 else: 

476 ok_schema = {"type": "array", "items": row_schema} 

477 

478 disabled_names = set(ctx.api_disabled) 

479 if "disable_params" in op: 

480 disabled_names |= parse_disable_params(op["disable_params"]) 

481 

482 if "format" in disabled_names: 

483 ok_content, err_content = self._build_single_format_response(op, ok_schema, ok_example) 

484 else: 

485 ok_content, err_content = self._build_response_content( 

486 ok_schema=ok_schema, 

487 declared_media_types=self._format_media_type_map(op), 

488 ok_example=ok_example, 

489 err_schema_ref="#/components/schemas/Error", 

490 ) 

491 

492 op_obj: OrderedDict[str, object] = OrderedDict() 

493 op_obj["tags"] = [ctx.tag_name] 

494 op_obj["summary"] = summary 

495 op_obj["description"] = desc 

496 custom_query_params = [] 

497 custom_names: set[str] = set() 

498 if "custom_params" in op: 

499 for name, conf in parse_custom_params(op["custom_params"]).items(): 

500 custom_names.add(name) 

501 param_obj: dict[str, object] = { 

502 "name": name, 

503 "in": "query", 

504 "required": False, 

505 "schema": {"type": "string"}, 

506 } 

507 if conf["description"]: 

508 param_obj["description"] = conf["description"] 

509 custom_query_params.append(param_obj) 

510 

511 suppressed = custom_names | disabled_names 

512 filtered_refs = [ref for ref in ctx.common_param_refs if ref["$ref"].rsplit("/", 1)[-1] not in suppressed] 

513 op_obj["parameters"] = path_params + custom_query_params + filtered_refs 

514 

515 if method in ("post", "put", "delete"): 

516 request_body = OpenAPIDocumentationHandler._build_request_body(op) 

517 if request_body is not None: 

518 op_obj["requestBody"] = request_body 

519 

520 responses: OrderedDict[str, object] = OrderedDict( 

521 [("200", {"description": "Successful response", "content": ok_content})], 

522 ) 

523 requires_auth = parse_auth(op["auth"]) if "auth" in op else ctx.api_auth 

524 if requires_auth: 

525 op_obj["security"] = [{"bearerAuth": []}] 

526 responses["401"] = {"description": "Unauthorized", "content": err_content} 

527 responses["default"] = {"description": "Error", "content": err_content} 

528 op_obj["responses"] = responses 

529 

530 return op_obj 

531 

532 def _server_url(self, api_meta: dict[str, str]) -> str: 

533 return f"{api_meta.get('base', '')}{api_meta.get('url', '')}" 

534 

535 def _build_openapi(self, base_url: str | None = None) -> OrderedDict[str, object]: 

536 conf = self._get_conf(base_url) 

537 api_meta = conf["conf_json"][0] 

538 formats_enum = self._collect_format_tokens(conf) 

539 

540 spec = OrderedDict() 

541 spec["openapi"] = "3.2.0" 

542 spec["info"] = self._build_info(api_meta) 

543 

544 spec["servers"] = [{"url": self._server_url(api_meta)}] 

545 

546 api_disabled = parse_disable_params(api_meta["disable_params"]) if "disable_params" in api_meta else set() 

547 api_auth = parse_auth(api_meta["auth"]) if "auth" in api_meta else False 

548 

549 all_common_params = self._build_common_parameters(formats_enum) 

550 active_common_params = {k: v for k, v in all_common_params.items() if k not in api_disabled} 

551 

552 components: dict[str, object] = { 

553 "schemas": { 

554 "Error": { 

555 "type": "object", 

556 "properties": {"error": {"type": "integer"}, "message": {"type": "string"}}, 

557 "required": ["error", "message"], 

558 "example": {"error": 404, "message": "HTTP status code 404: resource not found"}, 

559 }, 

560 }, 

561 "securitySchemes": { 

562 "bearerAuth": {"type": "http", "scheme": "bearer"}, 

563 }, 

564 } 

565 if active_common_params: 

566 components["parameters"] = active_common_params 

567 spec["components"] = components 

568 

569 common_param_refs = [{"$ref": f"#/components/parameters/{name}"} for name in active_common_params] 

570 

571 spec["paths"] = OrderedDict() 

572 tag_name = api_meta.get("title", "RAMOSE API") 

573 

574 ctx = _OpenAPIBuildContext( 

575 tag_name=tag_name, 

576 common_param_refs=common_param_refs, 

577 formats_enum=formats_enum, 

578 api_disabled=api_disabled, 

579 api_auth=api_auth, 

580 ) 

581 

582 for op in conf["conf_json"][1:]: 

583 raw_path = op.get("url", "") 

584 if raw_path not in spec["paths"]: 

585 spec["paths"][raw_path] = OrderedDict() 

586 

587 path_params = self._build_path_params(op, raw_path) 

588 

589 methods = [mm.lower() for mm in split(r"\s+", op.get("method", "get").strip()) if mm] 

590 for m in methods: 

591 spec["paths"][raw_path][m] = self._build_operation_object(op, path_params, ctx, m) 

592 

593 return spec 

594 

595 def _to_builtin(self, obj: object) -> object: 

596 if isinstance(obj, OrderedDict): 

597 obj = dict(obj) 

598 if isinstance(obj, dict): 

599 return {k: self._to_builtin(v) for k, v in obj.items()} 

600 if isinstance(obj, (list, tuple, set)): 

601 return [self._to_builtin(v) for v in obj] 

602 return obj 

603 

604 def _dump_yaml(self, spec: object) -> str: 

605 class _RamoseYamlDumper(yaml.SafeDumper): 

606 def ignore_aliases(self, data: object) -> bool: # noqa: ARG002 

607 return True 

608 

609 def _str_presenter(dumper: yaml.SafeDumper, data: str) -> yaml.ScalarNode: 

610 if "\n" in data: 

611 return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") 

612 return dumper.represent_scalar("tag:yaml.org,2002:str", data) 

613 

614 _RamoseYamlDumper.add_representer(str, _str_presenter) 

615 return yaml.dump(spec, Dumper=_RamoseYamlDumper, sort_keys=False, allow_unicode=True) 

616 

617 def get_documentation(self, base_url: str | None = None, *_args: object, **_dargs: object) -> tuple[int, str]: 

618 spec = self._build_openapi(base_url=base_url) 

619 spec = self._to_builtin(spec) 

620 yml = self._dump_yaml(spec) 

621 return 200, yml 

622 

623 def get_swagger_ui(self, base_url: str | None = None, *_args: object, **_dargs: object) -> tuple[int, str]: 

624 spec = self._to_builtin(self._build_openapi(base_url=base_url)) 

625 spec_json = json.dumps(spec).replace("</", "<\\/") 

626 init_script = ( 

627 "SwaggerUIBundle({spec: " 

628 + spec_json 

629 + ", dom_id: '#swagger-ui', presets: [SwaggerUIBundle.presets.apis]});" 

630 ) 

631 server_url = self._server_url(self._get_conf(base_url)["conf_json"][0]) 

632 base_tag = f"<base href='{server_url}'>" if server_url.startswith(("http://", "https://")) else "" 

633 return 200, ( 

634 "<!DOCTYPE html><html lang='en'><head><meta charset='UTF-8'>" 

635 + base_tag 

636 + "<style>" 

637 + _read_swagger_asset("swagger-ui.css") 

638 + SWAGGER_MARKDOWN_CSS_FIX 

639 + "</style></head><body><div id='swagger-ui'></div><script>" 

640 + _read_swagger_asset("swagger-ui-bundle.js") 

641 + "</script><script>" 

642 + init_script 

643 + "</script></body></html>" 

644 ) 

645 

646 def store_documentation( 

647 self, file_path: str, base_url: str | None = None, *_args: object, **_dargs: object 

648 ) -> None: 

649 yml = self.get_documentation(base_url=base_url)[1] 

650 with Path(file_path).open("w", encoding="utf8") as f: 

651 f.write(yml) 

652 

653 def get_index(self, *_args: object, **_dargs: object) -> str: 

654 return "OpenAPI exporter available."