Coverage for oc_ocdm/counter_handler/filesystem_counter_handler.py: 88%

137 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-05-30 22:05 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16from __future__ import annotations 

17 

18import os 

19from shutil import copymode, move 

20from tempfile import mkstemp 

21from typing import TYPE_CHECKING 

22 

23if TYPE_CHECKING: 

24 from typing import BinaryIO, Tuple, List, Dict 

25 

26from oc_ocdm.counter_handler.counter_handler import CounterHandler 

27from oc_ocdm.support.support import is_string_empty 

28 

29 

30class FilesystemCounterHandler(CounterHandler): 

31 """A concrete implementation of the ``CounterHandler`` interface that persistently stores 

32 the counter values within the filesystem.""" 

33 

34 _initial_line_len: int = 3 

35 _trailing_char: str = " " 

36 

37 def __init__(self, info_dir: str, supplier_prefix: str = "") -> None: 

38 """ 

39 Constructor of the ``FilesystemCounterHandler`` class. 

40 

41 :param info_dir: The path to the folder that does/will contain the counter values. 

42 :type info_dir: str 

43 :raises ValueError: if ``info_dir`` is None or an empty string. 

44 """ 

45 if info_dir is None or is_string_empty(info_dir): 

46 raise ValueError("info_dir parameter is required!") 

47 

48 if info_dir[-1] != os.sep: 

49 info_dir += os.sep 

50 

51 self.info_dir: str = info_dir 

52 self.supplier_prefix: str = supplier_prefix 

53 self.datasets_dir: str = info_dir + 'datasets' + os.sep 

54 self.short_names: List[str] = ["an", "ar", "be", "br", "ci", "de", "id", "pl", "ra", "re", "rp"] 

55 self.metadata_short_names: List[str] = ["di"] 

56 self.info_files: Dict[str, str] = {key: ("info_file_" + key + ".txt") 

57 for key in self.short_names} 

58 self.prov_files: Dict[str, str] = {key: ("prov_file_" + key + ".txt") 

59 for key in self.short_names} 

60 

61 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "", 

62 identifier: int = 1, supplier_prefix: str = "") -> None: 

63 """ 

64 It allows to set the counter value of graph and provenance entities. 

65 

66 :param new_value: The new counter value to be set 

67 :type new_value: int 

68 :param entity_short_name: The short name associated either to the type of the entity itself 

69 or, in case of a provenance entity, to the type of the relative graph entity. 

70 :type entity_short_name: str 

71 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

72 of the entity itself. An empty string otherwise. 

73 :type prov_short_name: str 

74 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

75 graph entity. The integer value '1' otherwise. 

76 :type identifier: int 

77 :raises ValueError: if ``new_value`` is a negative integer or ``identifier`` is less than or equal to zero. 

78 :return: None 

79 """ 

80 if new_value < 0: 

81 raise ValueError("new_value must be a non negative integer!") 

82 

83 if prov_short_name == "se": 

84 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix) 

85 else: 

86 file_path: str = self._get_info_path(entity_short_name, supplier_prefix) 

87 self._set_number(new_value, file_path, identifier) 

88 

89 def set_counters_batch(self, updates: Dict[Tuple[str, str], Dict[int, int]], supplier_prefix: str) -> None: 

90 """ 

91 Updates counters in batch for multiple files. 

92 `updates` is a dictionary where the key is a tuple (entity_short_name, prov_short_name) 

93 and the value is a dictionary of line numbers to new counter values. 

94 """ 

95 for (entity_short_name, prov_short_name), file_updates in updates.items(): 

96 file_path = self._get_prov_path(entity_short_name, supplier_prefix) if prov_short_name == "se" else self._get_info_path(entity_short_name, supplier_prefix) 

97 self._set_numbers(file_path, file_updates) 

98 

99 def _set_numbers(self, file_path: str, updates: Dict[int, int]) -> None: 

100 """ 

101 Apply multiple counter updates to a single file. 

102 `updates` is a dictionary where the key is the line number (identifier) 

103 and the value is the new counter value. 

104 """ 

105 self.__initialize_file_if_not_existing(file_path) 

106 with open(file_path, 'r') as file: 

107 lines = file.readlines() 

108 max_line_number = max(updates.keys()) 

109 

110 # Ensure the lines list is long enough 

111 while len(lines) < max_line_number + 1: 

112 lines.append("\n") # Default counter value 

113 

114 # Apply updates 

115 for line_number, new_value in updates.items(): 

116 lines[line_number-1] = str(new_value).rstrip() + "\n" 

117 

118 # Write updated lines back to file 

119 with open(file_path, 'w') as file: 

120 file.writelines(lines) 

121 

122 def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int: 

123 """ 

124 It allows to read the counter value of graph and provenance entities. 

125 

126 :param entity_short_name: The short name associated either to the type of the entity itself 

127 or, in case of a provenance entity, to the type of the relative graph entity. 

128 :type entity_short_name: str 

129 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

130 of the entity itself. An empty string otherwise. 

131 :type prov_short_name: str 

132 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

133 graph entity. The integer value '1' otherwise. 

134 :type identifier: int 

135 :raises ValueError: if ``identifier`` is less than or equal to zero. 

136 :return: The requested counter value. 

137 """ 

138 if prov_short_name == "se": 

139 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix) 

140 else: 

141 file_path: str = self._get_info_path(entity_short_name, supplier_prefix) 

142 return self._read_number(file_path, identifier) 

143 

144 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int: 

145 """ 

146 It allows to increment the counter value of graph and provenance entities by one unit. 

147 

148 :param entity_short_name: The short name associated either to the type of the entity itself 

149 or, in case of a provenance entity, to the type of the relative graph entity. 

150 :type entity_short_name: str 

151 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

152 of the entity itself. An empty string otherwise. 

153 :type prov_short_name: str 

154 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

155 graph entity. The integer value '1' otherwise. 

156 :type identifier: int 

157 :raises ValueError: if ``identifier`` is less than or equal to zero. 

158 :return: The newly-updated (already incremented) counter value. 

159 """ 

160 if prov_short_name == "se": 

161 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix) 

162 else: 

163 file_path: str = self._get_info_path(entity_short_name, supplier_prefix) 

164 return self._add_number(file_path, identifier) 

165 

166 def _get_info_path(self, short_name: str, supplier_prefix: str) -> str: 

167 supplier_prefix = "" if supplier_prefix is None else supplier_prefix 

168 directory = self.info_dir if supplier_prefix == self.supplier_prefix or not self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1) 

169 return directory + self.info_files[short_name] 

170 

171 def _get_prov_path(self, short_name: str, supplier_prefix: str) -> str: 

172 supplier_prefix = "" if supplier_prefix is None else supplier_prefix 

173 directory = self.info_dir if supplier_prefix == self.supplier_prefix or not self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1) 

174 return directory + self.prov_files[short_name] 

175 

176 def _get_metadata_path(self, short_name: str, dataset_name: str) -> str: 

177 return self.datasets_dir + dataset_name + os.sep + 'metadata_' + short_name + '.txt' 

178 

179 def __initialize_file_if_not_existing(self, file_path: str): 

180 if not os.path.exists(os.path.dirname(file_path)): 

181 os.makedirs(os.path.dirname(file_path)) 

182 

183 if not os.path.isfile(file_path): 

184 with open(file_path, 'w') as file: 

185 file.write("\n") 

186 

187 def _read_number(self, file_path: str, line_number: int) -> int: 

188 if line_number <= 0: 

189 raise ValueError("line_number must be a positive non-zero integer number!") 

190 

191 self.__initialize_file_if_not_existing(file_path) 

192 

193 cur_number: int = 0 

194 try: 

195 with open(file_path, 'r') as file: 

196 for i, line in enumerate(file, 1): 

197 if i == line_number: 

198 line = line.strip() 

199 if line: 

200 cur_number = int(line) 

201 break 

202 else: 

203 print(file_path) 

204 except ValueError as e: 

205 print(f"ValueError: {e}") 

206 cur_number = 0 

207 except Exception as e: 

208 print(f"Unexpected error: {e}") 

209 return cur_number 

210 

211 def _add_number(self, file_path: str, line_number: int = 1) -> int: 

212 if line_number <= 0: 

213 raise ValueError("line_number must be a positive non-zero integer number!") 

214 

215 self.__initialize_file_if_not_existing(file_path) 

216 

217 current_value = self._read_number(file_path, line_number) 

218 new_value = current_value + 1 

219 self._set_number(new_value, file_path, line_number) 

220 return new_value 

221 

222 def _set_number(self, new_value: int, file_path: str, line_number: int = 1) -> None: 

223 if new_value < 0: 

224 raise ValueError("new_value must be a non negative integer!") 

225 

226 if line_number <= 0: 

227 raise ValueError("line_number must be a positive non-zero integer number!") 

228 

229 self.__initialize_file_if_not_existing(file_path) 

230 

231 lines = [] 

232 with open(file_path, 'r') as file: 

233 lines = file.readlines() 

234 

235 # Ensure the file has enough lines 

236 while len(lines) < line_number: 

237 lines.append("\n") 

238 

239 # Update the specific line 

240 lines[line_number - 1] = f"{new_value}\n" 

241 

242 # Write back to the file 

243 with open(file_path, 'w') as file: 

244 file.writelines(lines) 

245 

246 

247 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None: 

248 """ 

249 It allows to set the counter value of metadata entities. 

250 

251 :param new_value: The new counter value to be set 

252 :type new_value: int 

253 :param entity_short_name: The short name associated either to the type of the entity itself. 

254 :type entity_short_name: str 

255 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

256 :type dataset_name: str 

257 :raises ValueError: if ``new_value`` is a negative integer, ``dataset_name`` is None or 

258 ``entity_short_name`` is not a known metadata short name. 

259 :return: None 

260 """ 

261 if new_value < 0: 

262 raise ValueError("new_value must be a non negative integer!") 

263 

264 if dataset_name is None: 

265 raise ValueError("dataset_name must be provided!") 

266 

267 if entity_short_name not in self.metadata_short_names: 

268 raise ValueError("entity_short_name is not a known metadata short name!") 

269 

270 file_path: str = self._get_metadata_path(entity_short_name, dataset_name) 

271 return self._set_number(new_value, file_path, 1) 

272 

273 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

274 """ 

275 It allows to read the counter value of metadata entities. 

276 

277 :param entity_short_name: The short name associated either to the type of the entity itself. 

278 :type entity_short_name: str 

279 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

280 :type dataset_name: str 

281 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name. 

282 :return: The requested counter value. 

283 """ 

284 if dataset_name is None: 

285 raise ValueError("dataset_name must be provided!") 

286 

287 if entity_short_name not in self.metadata_short_names: 

288 raise ValueError("entity_short_name is not a known metadata short name!") 

289 

290 file_path: str = self._get_metadata_path(entity_short_name, dataset_name) 

291 return self._read_number(file_path, 1) 

292 

293 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

294 """ 

295 It allows to increment the counter value of metadata entities by one unit. 

296 

297 :param entity_short_name: The short name associated either to the type of the entity itself. 

298 :type entity_short_name: str 

299 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

300 :type dataset_name: str 

301 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name. 

302 :return: The newly-updated (already incremented) counter value. 

303 """ 

304 if dataset_name is None: 

305 raise ValueError("dataset_name must be provided!") 

306 

307 if entity_short_name not in self.metadata_short_names: 

308 raise ValueError("entity_short_name is not a known metadata short name!") 

309 

310 file_path: str = self._get_metadata_path(entity_short_name, dataset_name) 

311 return self._add_number(file_path, 1)