Coverage for ramose / hash_format.py: 99%
97 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-01 13:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-01 13:49 +0000
1# SPDX-FileCopyrightText: 2018-2021 Silvio Peroni <silvio.peroni@unibo.it>
2# SPDX-FileCopyrightText: 2020-2021 Marilena Daquino <marilena.daquino2@unibo.it>
3# SPDX-FileCopyrightText: 2022 Davide Brembilla
4# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it>
5# SPDX-FileCopyrightText: 2025 Sergei Slinkin
6# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
7#
8# SPDX-License-Identifier: ISC
10from collections.abc import Mapping
11from pathlib import Path
12from re import DOTALL, search
14import yaml
16BUILTIN_PARAMS = frozenset({"require", "filter", "sort", "format", "json", "page", "page_size"})
17CUSTOM_PARAM_PHASES = frozenset({"preprocess", "postprocess"})
18YAML_SPEC_SUFFIXES = frozenset({".yaml", ".yml"})
21def parse_disable_params(raw: str) -> set[str]:
22 stripped = raw.strip()
23 if stripped == "*":
24 return set(BUILTIN_PARAMS)
25 return {name.strip() for name in stripped.split(",") if name.strip()}
28def parse_auth(raw: str) -> bool:
29 return raw.strip() == "required"
32def _is_yaml_handler(handler: str) -> bool:
33 return handler.endswith((".yaml", ".yml"))
36def parse_custom_params(raw: str) -> dict[str, dict[str, str]]:
37 result = {}
38 for raw_part in raw.split(";"):
39 part = raw_part.strip()
40 if not part:
41 continue
42 name, handler, phase_or_description, *desc_parts = part.split(",", 3)
43 handler = handler.strip()
44 phase = phase_or_description.strip()
45 if _is_yaml_handler(handler):
46 if phase == "postprocess":
47 msg = f"YAML custom parameter handler '{handler}' cannot be used for postprocess"
48 raise ValueError(msg)
49 if phase not in CUSTOM_PARAM_PHASES:
50 desc_parts = [",".join([phase_or_description, *desc_parts])]
51 phase = "preprocess"
52 result[name.strip()] = {
53 "handler": handler,
54 "phase": phase,
55 "description": desc_parts[0].strip() if desc_parts else "",
56 }
57 return result
60class HashFormatHandler:
61 """This class creates an object capable to read files stored in Hash Format (see
62 https://github.com/opencitations/ramose#Hashformat-configuration-file). A Hash Format
63 file (.hf) is a specification file that includes information structured using the following
64 syntax:
66 ```
67 #<field_name_1> <field_value_1>
68 #<field_name_1> <field_value_2>
69 #<field_name_3> <field_value_3>
70 [...]
71 #<field_name_n> <field_value_n>
72 ```"""
74 @staticmethod
75 def _process_field_line(
76 cur_field_name: str,
77 cur_field_content: str,
78 first_field_name: str | None,
79 cur_object: dict[str, str],
80 result: list[dict[str, str]],
81 ) -> tuple[str, dict[str, str]]:
82 if first_field_name is None:
83 first_field_name = cur_field_name
84 if cur_field_name == first_field_name:
85 if cur_object:
86 result.append(cur_object)
87 cur_object = {}
88 cur_object[cur_field_name] = cur_field_content
89 return first_field_name, cur_object
91 def read(self, file_path: str) -> list[dict[str, str]]:
92 """This method takes in input a path of a file containing a document specified in
93 Hash Format, and returns its representation as list of dictionaries."""
94 result: list[dict[str, str]] = []
96 with Path(file_path).open(newline=None) as f:
97 first_field_name = None
98 cur_object: dict[str, str] = {}
99 cur_field_name = None
100 for line in f:
101 cur_matching = search(r"^#([^\s]+)\s(.+)$", line, DOTALL)
102 if cur_matching is not None:
103 cur_field_name = cur_matching.group(1)
104 cur_field_content = cur_matching.group(2)
105 if cur_field_name and cur_field_content:
106 first_field_name, cur_object = HashFormatHandler._process_field_line(
107 cur_field_name, cur_field_content, first_field_name, cur_object, result
108 )
109 elif cur_object and cur_field_name is not None:
110 cur_object[cur_field_name] += line
112 if cur_object:
113 result.append(cur_object)
115 for item in result:
116 for key in item:
117 item[key] = item[key].rstrip()
119 return result
122class YAMLSpecHandler:
123 def read(self, file_path: str) -> list[dict[str, str]]:
124 raw_data: object = yaml.safe_load(Path(file_path).read_text(encoding="utf-8"))
125 if raw_data is None:
126 return []
127 if not isinstance(raw_data, list):
128 msg = "YAML spec must be a list of sections"
129 raise TypeError(msg)
131 result: list[dict[str, str]] = []
132 for section_number, section in enumerate(raw_data, 1):
133 result.append(self._normalize_section(section_number, section))
134 return result
136 @staticmethod
137 def _normalize_section(section_number: int, section: object) -> dict[str, str]:
138 if not isinstance(section, Mapping):
139 msg = f"YAML spec section {section_number} must be a mapping"
140 raise TypeError(msg)
142 result: dict[str, str] = {}
143 for key, value in section.items():
144 if not isinstance(key, str):
145 msg = f"YAML spec section {section_number} has a non-string key: {key!r}"
146 raise TypeError(msg)
147 if not isinstance(value, str):
148 msg = f"YAML spec field '{key}' in section {section_number} must be a string"
149 raise TypeError(msg)
150 result[key] = value.rstrip()
151 return result
154def read_spec_file(file_path: str) -> list[dict[str, str]]:
155 if Path(file_path).suffix.lower() in YAML_SPEC_SUFFIXES:
156 return YAMLSpecHandler().read(file_path)
157 return HashFormatHandler().read(file_path)