Coverage for oc_ocdm / counter_handler / filesystem_counter_handler.py: 89%

135 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-28 18:52 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com> 

4# SPDX-FileCopyrightText: 2024 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8# -*- coding: utf-8 -*- 

9from __future__ import annotations 

10 

11import os 

12from shutil import copymode, move 

13from tempfile import mkstemp 

14from typing import TYPE_CHECKING 

15 

16if TYPE_CHECKING: 

17 from typing import BinaryIO, Tuple, List, Dict 

18 

19from oc_ocdm.counter_handler.counter_handler import CounterHandler 

20from oc_ocdm.support.support import is_string_empty 

21 

22 

23class FilesystemCounterHandler(CounterHandler): 

24 """A concrete implementation of the ``CounterHandler`` interface that persistently stores 

25 the counter values within the filesystem.""" 

26 

27 _initial_line_len: int = 3 

28 _trailing_char: str = " " 

29 

30 def __init__(self, info_dir: str, supplier_prefix: str = "") -> None: 

31 """ 

32 Constructor of the ``FilesystemCounterHandler`` class. 

33 

34 :param info_dir: The path to the folder that does/will contain the counter values. 

35 :type info_dir: str 

36 :raises ValueError: if ``info_dir`` is None or an empty string. 

37 """ 

38 if info_dir is None or is_string_empty(info_dir): 

39 raise ValueError("info_dir parameter is required!") 

40 

41 if info_dir[-1] != os.sep: 

42 info_dir += os.sep 

43 

44 self.info_dir: str = info_dir 

45 self.supplier_prefix: str = supplier_prefix 

46 self.datasets_dir: str = info_dir + 'datasets' + os.sep 

47 self.short_names: List[str] = ["an", "ar", "be", "br", "ci", "de", "id", "pl", "ra", "re", "rp"] 

48 self.metadata_short_names: List[str] = ["di"] 

49 self.info_files: Dict[str, str] = {key: ("info_file_" + key + ".txt") 

50 for key in self.short_names} 

51 self.prov_files: Dict[str, str] = {key: ("prov_file_" + key + ".txt") 

52 for key in self.short_names} 

53 

54 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "", 

55 identifier: int = 1, supplier_prefix: str = "") -> None: 

56 """ 

57 It allows to set the counter value of graph and provenance entities. 

58 

59 :param new_value: The new counter value to be set 

60 :type new_value: int 

61 :param entity_short_name: The short name associated either to the type of the entity itself 

62 or, in case of a provenance entity, to the type of the relative graph entity. 

63 :type entity_short_name: str 

64 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

65 of the entity itself. An empty string otherwise. 

66 :type prov_short_name: str 

67 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

68 graph entity. The integer value '1' otherwise. 

69 :type identifier: int 

70 :raises ValueError: if ``new_value`` is a negative integer or ``identifier`` is less than or equal to zero. 

71 :return: None 

72 """ 

73 if new_value < 0: 

74 raise ValueError("new_value must be a non negative integer!") 

75 

76 if prov_short_name == "se": 

77 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix) 

78 else: 

79 file_path: str = self._get_info_path(entity_short_name, supplier_prefix) 

80 self._set_number(new_value, file_path, identifier) 

81 

82 def set_counters_batch(self, updates: Dict[Tuple[str, str], Dict[int, int]], supplier_prefix: str) -> None: 

83 """ 

84 Updates counters in batch for multiple files. 

85 `updates` is a dictionary where the key is a tuple (entity_short_name, prov_short_name) 

86 and the value is a dictionary of line numbers to new counter values. 

87 """ 

88 for (entity_short_name, prov_short_name), file_updates in updates.items(): 

89 file_path = self._get_prov_path(entity_short_name, supplier_prefix) if prov_short_name == "se" else self._get_info_path(entity_short_name, supplier_prefix) 

90 self._set_numbers(file_path, file_updates) 

91 

92 def _set_numbers(self, file_path: str, updates: Dict[int, int]) -> None: 

93 """ 

94 Apply multiple counter updates to a single file. 

95 `updates` is a dictionary where the key is the line number (identifier) 

96 and the value is the new counter value. 

97 """ 

98 self.__initialize_file_if_not_existing(file_path) 

99 with open(file_path, 'r') as file: 

100 lines = file.readlines() 

101 max_line_number = max(updates.keys()) 

102 

103 # Ensure the lines list is long enough 

104 while len(lines) < max_line_number + 1: 

105 lines.append("\n") # Default counter value 

106 

107 # Apply updates 

108 for line_number, new_value in updates.items(): 

109 lines[line_number-1] = str(new_value).rstrip() + "\n" 

110 

111 # Write updated lines back to file 

112 with open(file_path, 'w') as file: 

113 file.writelines(lines) 

114 

115 def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int: 

116 """ 

117 It allows to read the counter value of graph and provenance entities. 

118 

119 :param entity_short_name: The short name associated either to the type of the entity itself 

120 or, in case of a provenance entity, to the type of the relative graph entity. 

121 :type entity_short_name: str 

122 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

123 of the entity itself. An empty string otherwise. 

124 :type prov_short_name: str 

125 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

126 graph entity. The integer value '1' otherwise. 

127 :type identifier: int 

128 :raises ValueError: if ``identifier`` is less than or equal to zero. 

129 :return: The requested counter value. 

130 """ 

131 if prov_short_name == "se": 

132 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix) 

133 else: 

134 file_path: str = self._get_info_path(entity_short_name, supplier_prefix) 

135 return self._read_number(file_path, identifier) 

136 

137 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int: 

138 """ 

139 It allows to increment the counter value of graph and provenance entities by one unit. 

140 

141 :param entity_short_name: The short name associated either to the type of the entity itself 

142 or, in case of a provenance entity, to the type of the relative graph entity. 

143 :type entity_short_name: str 

144 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

145 of the entity itself. An empty string otherwise. 

146 :type prov_short_name: str 

147 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

148 graph entity. The integer value '1' otherwise. 

149 :type identifier: int 

150 :raises ValueError: if ``identifier`` is less than or equal to zero. 

151 :return: The newly-updated (already incremented) counter value. 

152 """ 

153 if prov_short_name == "se": 

154 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix) 

155 else: 

156 file_path: str = self._get_info_path(entity_short_name, supplier_prefix) 

157 return self._add_number(file_path, identifier) 

158 

159 def _get_info_path(self, short_name: str, supplier_prefix: str) -> str: 

160 supplier_prefix = "" if supplier_prefix is None else supplier_prefix 

161 directory = self.info_dir if supplier_prefix == self.supplier_prefix or not self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1) 

162 return directory + self.info_files[short_name] 

163 

164 def _get_prov_path(self, short_name: str, supplier_prefix: str) -> str: 

165 supplier_prefix = "" if supplier_prefix is None else supplier_prefix 

166 directory = self.info_dir if supplier_prefix == self.supplier_prefix or not self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1) 

167 return directory + self.prov_files[short_name] 

168 

169 def _get_metadata_path(self, short_name: str, dataset_name: str) -> str: 

170 return self.datasets_dir + dataset_name + os.sep + 'metadata_' + short_name + '.txt' 

171 

172 def __initialize_file_if_not_existing(self, file_path: str): 

173 if not os.path.exists(os.path.dirname(file_path)): 

174 os.makedirs(os.path.dirname(file_path)) 

175 

176 if not os.path.isfile(file_path): 

177 with open(file_path, 'w') as file: 

178 file.write("\n") 

179 

180 def _read_number(self, file_path: str, line_number: int) -> int: 

181 if line_number <= 0: 

182 raise ValueError("line_number must be a positive non-zero integer number!") 

183 

184 self.__initialize_file_if_not_existing(file_path) 

185 

186 cur_number: int = 0 

187 try: 

188 with open(file_path, 'r') as file: 

189 for i, line in enumerate(file, 1): 

190 if i == line_number: 

191 line = line.strip() 

192 if line: 

193 cur_number = int(line) 

194 break 

195 else: 

196 print(file_path) 

197 except ValueError as e: 

198 print(f"ValueError: {e}") 

199 cur_number = 0 

200 except Exception as e: 

201 print(f"Unexpected error: {e}") 

202 return cur_number 

203 

204 def _add_number(self, file_path: str, line_number: int = 1) -> int: 

205 if line_number <= 0: 

206 raise ValueError("line_number must be a positive non-zero integer number!") 

207 

208 self.__initialize_file_if_not_existing(file_path) 

209 

210 current_value = self._read_number(file_path, line_number) 

211 new_value = current_value + 1 

212 self._set_number(new_value, file_path, line_number) 

213 return new_value 

214 

215 def _set_number(self, new_value: int, file_path: str, line_number: int = 1) -> None: 

216 if new_value < 0: 

217 raise ValueError("new_value must be a non negative integer!") 

218 

219 if line_number <= 0: 

220 raise ValueError("line_number must be a positive non-zero integer number!") 

221 

222 self.__initialize_file_if_not_existing(file_path) 

223 

224 lines = [] 

225 with open(file_path, 'r') as file: 

226 lines = file.readlines() 

227 

228 # Ensure the file has enough lines 

229 while len(lines) < line_number: 

230 lines.append("\n") 

231 

232 # Update the specific line 

233 lines[line_number - 1] = f"{new_value}\n" 

234 

235 # Write back to the file 

236 with open(file_path, 'w') as file: 

237 file.writelines(lines) 

238 

239 

240 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None: 

241 """ 

242 It allows to set the counter value of metadata entities. 

243 

244 :param new_value: The new counter value to be set 

245 :type new_value: int 

246 :param entity_short_name: The short name associated either to the type of the entity itself. 

247 :type entity_short_name: str 

248 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

249 :type dataset_name: str 

250 :raises ValueError: if ``new_value`` is a negative integer, ``dataset_name`` is None or 

251 ``entity_short_name`` is not a known metadata short name. 

252 :return: None 

253 """ 

254 if new_value < 0: 

255 raise ValueError("new_value must be a non negative integer!") 

256 

257 if dataset_name is None: 

258 raise ValueError("dataset_name must be provided!") 

259 

260 if entity_short_name not in self.metadata_short_names: 

261 raise ValueError("entity_short_name is not a known metadata short name!") 

262 

263 file_path: str = self._get_metadata_path(entity_short_name, dataset_name) 

264 return self._set_number(new_value, file_path, 1) 

265 

266 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

267 """ 

268 It allows to read the counter value of metadata entities. 

269 

270 :param entity_short_name: The short name associated either to the type of the entity itself. 

271 :type entity_short_name: str 

272 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

273 :type dataset_name: str 

274 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name. 

275 :return: The requested counter value. 

276 """ 

277 if dataset_name is None: 

278 raise ValueError("dataset_name must be provided!") 

279 

280 if entity_short_name not in self.metadata_short_names: 

281 raise ValueError("entity_short_name is not a known metadata short name!") 

282 

283 file_path: str = self._get_metadata_path(entity_short_name, dataset_name) 

284 return self._read_number(file_path, 1) 

285 

286 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

287 """ 

288 It allows to increment the counter value of metadata entities by one unit. 

289 

290 :param entity_short_name: The short name associated either to the type of the entity itself. 

291 :type entity_short_name: str 

292 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

293 :type dataset_name: str 

294 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name. 

295 :return: The newly-updated (already incremented) counter value. 

296 """ 

297 if dataset_name is None: 

298 raise ValueError("dataset_name must be provided!") 

299 

300 if entity_short_name not in self.metadata_short_names: 

301 raise ValueError("entity_short_name is not a known metadata short name!") 

302 

303 file_path: str = self._get_metadata_path(entity_short_name, dataset_name) 

304 return self._add_number(file_path, 1)