Coverage for ramose / hash_format.py: 99%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-01 13:49 +0000

1# SPDX-FileCopyrightText: 2018-2021 Silvio Peroni <silvio.peroni@unibo.it> 

2# SPDX-FileCopyrightText: 2020-2021 Marilena Daquino <marilena.daquino2@unibo.it> 

3# SPDX-FileCopyrightText: 2022 Davide Brembilla 

4# SPDX-FileCopyrightText: 2024 Ivan Heibi <ivan.heibi2@unibo.it> 

5# SPDX-FileCopyrightText: 2025 Sergei Slinkin 

6# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

7# 

8# SPDX-License-Identifier: ISC 

9 

10from collections.abc import Mapping 

11from pathlib import Path 

12from re import DOTALL, search 

13 

14import yaml 

15 

16BUILTIN_PARAMS = frozenset({"require", "filter", "sort", "format", "json", "page", "page_size"}) 

17CUSTOM_PARAM_PHASES = frozenset({"preprocess", "postprocess"}) 

18YAML_SPEC_SUFFIXES = frozenset({".yaml", ".yml"}) 

19 

20 

21def parse_disable_params(raw: str) -> set[str]: 

22 stripped = raw.strip() 

23 if stripped == "*": 

24 return set(BUILTIN_PARAMS) 

25 return {name.strip() for name in stripped.split(",") if name.strip()} 

26 

27 

28def parse_auth(raw: str) -> bool: 

29 return raw.strip() == "required" 

30 

31 

32def _is_yaml_handler(handler: str) -> bool: 

33 return handler.endswith((".yaml", ".yml")) 

34 

35 

36def parse_custom_params(raw: str) -> dict[str, dict[str, str]]: 

37 result = {} 

38 for raw_part in raw.split(";"): 

39 part = raw_part.strip() 

40 if not part: 

41 continue 

42 name, handler, phase_or_description, *desc_parts = part.split(",", 3) 

43 handler = handler.strip() 

44 phase = phase_or_description.strip() 

45 if _is_yaml_handler(handler): 

46 if phase == "postprocess": 

47 msg = f"YAML custom parameter handler '{handler}' cannot be used for postprocess" 

48 raise ValueError(msg) 

49 if phase not in CUSTOM_PARAM_PHASES: 

50 desc_parts = [",".join([phase_or_description, *desc_parts])] 

51 phase = "preprocess" 

52 result[name.strip()] = { 

53 "handler": handler, 

54 "phase": phase, 

55 "description": desc_parts[0].strip() if desc_parts else "", 

56 } 

57 return result 

58 

59 

60class HashFormatHandler: 

61 """This class creates an object capable to read files stored in Hash Format (see 

62 https://github.com/opencitations/ramose#Hashformat-configuration-file). A Hash Format 

63 file (.hf) is a specification file that includes information structured using the following 

64 syntax: 

65 

66 ``` 

67 #<field_name_1> <field_value_1> 

68 #<field_name_1> <field_value_2> 

69 #<field_name_3> <field_value_3> 

70 [...] 

71 #<field_name_n> <field_value_n> 

72 ```""" 

73 

74 @staticmethod 

75 def _process_field_line( 

76 cur_field_name: str, 

77 cur_field_content: str, 

78 first_field_name: str | None, 

79 cur_object: dict[str, str], 

80 result: list[dict[str, str]], 

81 ) -> tuple[str, dict[str, str]]: 

82 if first_field_name is None: 

83 first_field_name = cur_field_name 

84 if cur_field_name == first_field_name: 

85 if cur_object: 

86 result.append(cur_object) 

87 cur_object = {} 

88 cur_object[cur_field_name] = cur_field_content 

89 return first_field_name, cur_object 

90 

91 def read(self, file_path: str) -> list[dict[str, str]]: 

92 """This method takes in input a path of a file containing a document specified in 

93 Hash Format, and returns its representation as list of dictionaries.""" 

94 result: list[dict[str, str]] = [] 

95 

96 with Path(file_path).open(newline=None) as f: 

97 first_field_name = None 

98 cur_object: dict[str, str] = {} 

99 cur_field_name = None 

100 for line in f: 

101 cur_matching = search(r"^#([^\s]+)\s(.+)$", line, DOTALL) 

102 if cur_matching is not None: 

103 cur_field_name = cur_matching.group(1) 

104 cur_field_content = cur_matching.group(2) 

105 if cur_field_name and cur_field_content: 

106 first_field_name, cur_object = HashFormatHandler._process_field_line( 

107 cur_field_name, cur_field_content, first_field_name, cur_object, result 

108 ) 

109 elif cur_object and cur_field_name is not None: 

110 cur_object[cur_field_name] += line 

111 

112 if cur_object: 

113 result.append(cur_object) 

114 

115 for item in result: 

116 for key in item: 

117 item[key] = item[key].rstrip() 

118 

119 return result 

120 

121 

122class YAMLSpecHandler: 

123 def read(self, file_path: str) -> list[dict[str, str]]: 

124 raw_data: object = yaml.safe_load(Path(file_path).read_text(encoding="utf-8")) 

125 if raw_data is None: 

126 return [] 

127 if not isinstance(raw_data, list): 

128 msg = "YAML spec must be a list of sections" 

129 raise TypeError(msg) 

130 

131 result: list[dict[str, str]] = [] 

132 for section_number, section in enumerate(raw_data, 1): 

133 result.append(self._normalize_section(section_number, section)) 

134 return result 

135 

136 @staticmethod 

137 def _normalize_section(section_number: int, section: object) -> dict[str, str]: 

138 if not isinstance(section, Mapping): 

139 msg = f"YAML spec section {section_number} must be a mapping" 

140 raise TypeError(msg) 

141 

142 result: dict[str, str] = {} 

143 for key, value in section.items(): 

144 if not isinstance(key, str): 

145 msg = f"YAML spec section {section_number} has a non-string key: {key!r}" 

146 raise TypeError(msg) 

147 if not isinstance(value, str): 

148 msg = f"YAML spec field '{key}' in section {section_number} must be a string" 

149 raise TypeError(msg) 

150 result[key] = value.rstrip() 

151 return result 

152 

153 

154def read_spec_file(file_path: str) -> list[dict[str, str]]: 

155 if Path(file_path).suffix.lower() in YAML_SPEC_SUFFIXES: 

156 return YAMLSpecHandler().read(file_path) 

157 return HashFormatHandler().read(file_path)