Coverage for oc_ocdm / counter_handler / filesystem_counter_handler.py: 95%

147 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-08 20:23 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com> 

4# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8# -*- coding: utf-8 -*- 

9from __future__ import annotations 

10 

11import os 

12from typing import TYPE_CHECKING 

13 

14if TYPE_CHECKING: 

15 from typing import Dict, List, Tuple 

16 

17from oc_ocdm.counter_handler.counter_handler import CounterHandler 

18from oc_ocdm.support.support import is_string_empty 

19 

20 

21class FilesystemCounterHandler(CounterHandler): 

22 """A concrete implementation of the ``CounterHandler`` interface that persistently stores the counter values within the filesystem. 

23 

24 Counter data is loaded into RAM on first access per supplier prefix (lazy loading) and written back to disk only when ``flush()`` is called.""" 

25 

26 def __init__(self, info_dir: str, supplier_prefix: str = "") -> None: 

27 """ 

28 Constructor of the ``FilesystemCounterHandler`` class. 

29 

30 :param info_dir: The path to the folder that does/will contain the counter values. 

31 :type info_dir: str 

32 :raises ValueError: if ``info_dir`` is None or an empty string. 

33 """ 

34 if info_dir is None or is_string_empty(info_dir): 

35 raise ValueError("info_dir parameter is required!") 

36 

37 if info_dir[-1] != os.sep: 

38 info_dir += os.sep 

39 

40 self.info_dir: str = info_dir 

41 self.supplier_prefix: str = supplier_prefix 

42 self.datasets_dir: str = info_dir + "datasets" + os.sep 

43 self.short_names: List[str] = ["an", "ar", "be", "br", "ci", "de", "id", "pl", "ra", "re", "rp"] 

44 self.metadata_short_names: List[str] = ["di"] 

45 self.info_files: Dict[str, str] = {key: ("info_file_" + key + ".txt") for key in self.short_names} 

46 self.prov_files: Dict[str, str] = {key: ("prov_file_" + key + ".txt") for key in self.short_names} 

47 

48 self._cache: Dict[str, List[int]] = {} 

49 self._dirty: set[str] = set() 

50 self._loaded_dirs: set[str] = set() 

51 

52 self._ensure_loaded(supplier_prefix) 

53 

54 def _get_prefix_dir(self, supplier_prefix: str) -> str: 

55 sp = "" if supplier_prefix is None else supplier_prefix 

56 if sp == self.supplier_prefix or not self.supplier_prefix: 

57 return self.info_dir 

58 return self.info_dir.replace(self.supplier_prefix, sp, 1) 

59 

60 def _ensure_loaded(self, supplier_prefix: str) -> None: 

61 prefix_dir = self._get_prefix_dir(supplier_prefix) 

62 if prefix_dir in self._loaded_dirs: 

63 return 

64 if not os.path.isdir(prefix_dir): 

65 self._loaded_dirs.add(prefix_dir) 

66 return 

67 for filename in os.listdir(prefix_dir): 

68 if not filename.endswith(".txt"): 

69 continue 

70 if not (filename.startswith("info_file_") or filename.startswith("prov_file_")): 

71 continue 

72 filepath = prefix_dir + filename 

73 with open(filepath, "r") as f: 

74 self._cache[filepath] = [int(line.rstrip("\n")) if line.rstrip("\n") else 0 for line in f] 

75 self._loaded_dirs.add(prefix_dir) 

76 

77 def flush(self) -> None: 

78 for file_path in self._dirty: 

79 dir_path = os.path.dirname(file_path) 

80 if not os.path.exists(dir_path): 

81 os.makedirs(dir_path) 

82 cache_list = self._cache[file_path] 

83 with open(file_path, "w") as f: 

84 f.writelines(f"{v}\n" if v else "\n" for v in cache_list) 

85 self._dirty.clear() 

86 

87 def set_counter( 

88 self, 

89 new_value: int, 

90 entity_short_name: str, 

91 prov_short_name: str = "", 

92 identifier: int = 1, 

93 supplier_prefix: str = "", 

94 ) -> None: 

95 """ 

96 It allows to set the counter value of graph and provenance entities. 

97 

98 :param new_value: The new counter value to be set 

99 :type new_value: int 

100 :param entity_short_name: The short name associated either to the type of the entity itself 

101 or, in case of a provenance entity, to the type of the relative graph entity. 

102 :type entity_short_name: str 

103 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

104 of the entity itself. An empty string otherwise. 

105 :type prov_short_name: str 

106 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

107 graph entity. The integer value '1' otherwise. 

108 :type identifier: int 

109 :raises ValueError: if ``new_value`` is a negative integer or ``identifier`` is less than or equal to zero. 

110 :return: None 

111 """ 

112 if new_value < 0: 

113 raise ValueError("new_value must be a non negative integer!") 

114 self._ensure_loaded(supplier_prefix) 

115 if prov_short_name == "se": 

116 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix) 

117 else: 

118 file_path: str = self._get_info_path(entity_short_name, supplier_prefix) 

119 self._set_number(new_value, file_path, identifier) 

120 

121 def set_counters_batch(self, updates: Dict[Tuple[str, str], Dict[int, int]], supplier_prefix: str) -> None: 

122 """ 

123 Updates counters in batch for multiple files. 

124 `updates` is a dictionary where the key is a tuple (entity_short_name, prov_short_name) 

125 and the value is a dictionary of line numbers to new counter values. 

126 """ 

127 self._ensure_loaded(supplier_prefix) 

128 for (entity_short_name, prov_short_name), file_updates in updates.items(): 

129 file_path = ( 

130 self._get_prov_path(entity_short_name, supplier_prefix) 

131 if prov_short_name == "se" 

132 else self._get_info_path(entity_short_name, supplier_prefix) 

133 ) 

134 self._set_numbers(file_path, file_updates) 

135 

136 def _set_numbers(self, file_path: str, updates: Dict[int, int]) -> None: 

137 """ 

138 Apply multiple counter updates to a single file. 

139 `updates` is a dictionary where the key is the line number (identifier) 

140 and the value is the new counter value. 

141 """ 

142 if file_path not in self._cache: 

143 self._cache[file_path] = [0] 

144 cache_list = self._cache[file_path] 

145 needed = max(updates.keys()) + 1 - len(cache_list) 

146 if needed > 0: 

147 cache_list.extend([0] * needed) 

148 for line_number, new_value in updates.items(): 

149 cache_list[line_number - 1] = new_value 

150 self._dirty.add(file_path) 

151 

152 def read_counter( 

153 self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "" 

154 ) -> int: 

155 """ 

156 It allows to read the counter value of graph and provenance entities. 

157 

158 :param entity_short_name: The short name associated either to the type of the entity itself 

159 or, in case of a provenance entity, to the type of the relative graph entity. 

160 :type entity_short_name: str 

161 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

162 of the entity itself. An empty string otherwise. 

163 :type prov_short_name: str 

164 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

165 graph entity. The integer value '1' otherwise. 

166 :type identifier: int 

167 :raises ValueError: if ``identifier`` is less than or equal to zero. 

168 :return: The requested counter value. 

169 """ 

170 self._ensure_loaded(supplier_prefix) 

171 if prov_short_name == "se": 

172 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix) 

173 else: 

174 file_path: str = self._get_info_path(entity_short_name, supplier_prefix) 

175 return self._read_number(file_path, identifier) 

176 

177 def increment_counter( 

178 self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "" 

179 ) -> int: 

180 """ 

181 It allows to increment the counter value of graph and provenance entities by one unit. 

182 

183 :param entity_short_name: The short name associated either to the type of the entity itself 

184 or, in case of a provenance entity, to the type of the relative graph entity. 

185 :type entity_short_name: str 

186 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

187 of the entity itself. An empty string otherwise. 

188 :type prov_short_name: str 

189 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

190 graph entity. The integer value '1' otherwise. 

191 :type identifier: int 

192 :raises ValueError: if ``identifier`` is less than or equal to zero. 

193 :return: The newly-updated (already incremented) counter value. 

194 """ 

195 self._ensure_loaded(supplier_prefix) 

196 if prov_short_name == "se": 

197 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix) 

198 else: 

199 file_path: str = self._get_info_path(entity_short_name, supplier_prefix) 

200 return self._add_number(file_path, identifier) 

201 

202 def _get_info_path(self, short_name: str, supplier_prefix: str) -> str: 

203 return self._get_prefix_dir(supplier_prefix) + self.info_files[short_name] 

204 

205 def _get_prov_path(self, short_name: str, supplier_prefix: str) -> str: 

206 return self._get_prefix_dir(supplier_prefix) + self.prov_files[short_name] 

207 

208 def _get_metadata_path(self, short_name: str, dataset_name: str) -> str: 

209 return self.datasets_dir + dataset_name + os.sep + "metadata_" + short_name + ".txt" 

210 

211 def _read_number(self, file_path: str, line_number: int) -> int: 

212 if line_number <= 0: 

213 raise ValueError("line_number must be a positive non-zero integer number!") 

214 if file_path in self._cache: 

215 idx = line_number - 1 

216 cache_list = self._cache[file_path] 

217 if idx < len(cache_list): 

218 return cache_list[idx] 

219 return 0 

220 self._cache[file_path] = [0] 

221 return 0 

222 

223 def _add_number(self, file_path: str, line_number: int = 1) -> int: 

224 if line_number <= 0: 

225 raise ValueError("line_number must be a positive non-zero integer number!") 

226 current_value = self._read_number(file_path, line_number) 

227 new_value = current_value + 1 

228 self._set_number(new_value, file_path, line_number) 

229 return new_value 

230 

231 def _set_number(self, new_value: int, file_path: str, line_number: int = 1) -> None: 

232 if new_value < 0: 

233 raise ValueError("new_value must be a non negative integer!") 

234 if line_number <= 0: 

235 raise ValueError("line_number must be a positive non-zero integer number!") 

236 if file_path not in self._cache: 

237 self._cache[file_path] = [0] 

238 cache_list = self._cache[file_path] 

239 needed = line_number - len(cache_list) 

240 if needed > 0: 

241 cache_list.extend([0] * needed) 

242 cache_list[line_number - 1] = new_value 

243 self._dirty.add(file_path) 

244 

245 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None: 

246 """ 

247 It allows to set the counter value of metadata entities. 

248 

249 :param new_value: The new counter value to be set 

250 :type new_value: int 

251 :param entity_short_name: The short name associated either to the type of the entity itself. 

252 :type entity_short_name: str 

253 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

254 :type dataset_name: str 

255 :raises ValueError: if ``new_value`` is a negative integer, ``dataset_name`` is None or 

256 ``entity_short_name`` is not a known metadata short name. 

257 :return: None 

258 """ 

259 if new_value < 0: 

260 raise ValueError("new_value must be a non negative integer!") 

261 if dataset_name is None: 

262 raise ValueError("dataset_name must be provided!") 

263 if entity_short_name not in self.metadata_short_names: 

264 raise ValueError("entity_short_name is not a known metadata short name!") 

265 file_path: str = self._get_metadata_path(entity_short_name, dataset_name) 

266 return self._set_number(new_value, file_path, 1) 

267 

268 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

269 """ 

270 It allows to read the counter value of metadata entities. 

271 

272 :param entity_short_name: The short name associated either to the type of the entity itself. 

273 :type entity_short_name: str 

274 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

275 :type dataset_name: str 

276 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name. 

277 :return: The requested counter value. 

278 """ 

279 if dataset_name is None: 

280 raise ValueError("dataset_name must be provided!") 

281 if entity_short_name not in self.metadata_short_names: 

282 raise ValueError("entity_short_name is not a known metadata short name!") 

283 file_path: str = self._get_metadata_path(entity_short_name, dataset_name) 

284 return self._read_number(file_path, 1) 

285 

286 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

287 """ 

288 It allows to increment the counter value of metadata entities by one unit. 

289 

290 :param entity_short_name: The short name associated either to the type of the entity itself. 

291 :type entity_short_name: str 

292 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

293 :type dataset_name: str 

294 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name. 

295 :return: The newly-updated (already incremented) counter value. 

296 """ 

297 if dataset_name is None: 

298 raise ValueError("dataset_name must be provided!") 

299 if entity_short_name not in self.metadata_short_names: 

300 raise ValueError("entity_short_name is not a known metadata short name!") 

301 file_path: str = self._get_metadata_path(entity_short_name, dataset_name) 

302 return self._add_number(file_path, 1)