Coverage for oc_ocdm / counter_handler / filesystem_counter_handler.py: 95%
147 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8# -*- coding: utf-8 -*-
9from __future__ import annotations
11import os
12from typing import TYPE_CHECKING
14if TYPE_CHECKING:
15 from typing import Dict, List, Tuple
17from oc_ocdm.counter_handler.counter_handler import CounterHandler
18from oc_ocdm.support.support import is_string_empty
21class FilesystemCounterHandler(CounterHandler):
22 """A concrete implementation of the ``CounterHandler`` interface that persistently stores the counter values within the filesystem.
24 Counter data is loaded into RAM on first access per supplier prefix (lazy loading) and written back to disk only when ``flush()`` is called."""
26 def __init__(self, info_dir: str, supplier_prefix: str = "") -> None:
27 """
28 Constructor of the ``FilesystemCounterHandler`` class.
30 :param info_dir: The path to the folder that does/will contain the counter values.
31 :type info_dir: str
32 :raises ValueError: if ``info_dir`` is None or an empty string.
33 """
34 if info_dir is None or is_string_empty(info_dir):
35 raise ValueError("info_dir parameter is required!")
37 if info_dir[-1] != os.sep:
38 info_dir += os.sep
40 self.info_dir: str = info_dir
41 self.supplier_prefix: str = supplier_prefix
42 self.datasets_dir: str = info_dir + "datasets" + os.sep
43 self.short_names: List[str] = ["an", "ar", "be", "br", "ci", "de", "id", "pl", "ra", "re", "rp"]
44 self.metadata_short_names: List[str] = ["di"]
45 self.info_files: Dict[str, str] = {key: ("info_file_" + key + ".txt") for key in self.short_names}
46 self.prov_files: Dict[str, str] = {key: ("prov_file_" + key + ".txt") for key in self.short_names}
48 self._cache: Dict[str, List[int]] = {}
49 self._dirty: set[str] = set()
50 self._loaded_dirs: set[str] = set()
52 self._ensure_loaded(supplier_prefix)
54 def _get_prefix_dir(self, supplier_prefix: str) -> str:
55 sp = "" if supplier_prefix is None else supplier_prefix
56 if sp == self.supplier_prefix or not self.supplier_prefix:
57 return self.info_dir
58 return self.info_dir.replace(self.supplier_prefix, sp, 1)
60 def _ensure_loaded(self, supplier_prefix: str) -> None:
61 prefix_dir = self._get_prefix_dir(supplier_prefix)
62 if prefix_dir in self._loaded_dirs:
63 return
64 if not os.path.isdir(prefix_dir):
65 self._loaded_dirs.add(prefix_dir)
66 return
67 for filename in os.listdir(prefix_dir):
68 if not filename.endswith(".txt"):
69 continue
70 if not (filename.startswith("info_file_") or filename.startswith("prov_file_")):
71 continue
72 filepath = prefix_dir + filename
73 with open(filepath, "r") as f:
74 self._cache[filepath] = [int(line.rstrip("\n")) if line.rstrip("\n") else 0 for line in f]
75 self._loaded_dirs.add(prefix_dir)
77 def flush(self) -> None:
78 for file_path in self._dirty:
79 dir_path = os.path.dirname(file_path)
80 if not os.path.exists(dir_path):
81 os.makedirs(dir_path)
82 cache_list = self._cache[file_path]
83 with open(file_path, "w") as f:
84 f.writelines(f"{v}\n" if v else "\n" for v in cache_list)
85 self._dirty.clear()
87 def set_counter(
88 self,
89 new_value: int,
90 entity_short_name: str,
91 prov_short_name: str = "",
92 identifier: int = 1,
93 supplier_prefix: str = "",
94 ) -> None:
95 """
96 It allows to set the counter value of graph and provenance entities.
98 :param new_value: The new counter value to be set
99 :type new_value: int
100 :param entity_short_name: The short name associated either to the type of the entity itself
101 or, in case of a provenance entity, to the type of the relative graph entity.
102 :type entity_short_name: str
103 :param prov_short_name: In case of a provenance entity, the short name associated to the type
104 of the entity itself. An empty string otherwise.
105 :type prov_short_name: str
106 :param identifier: In case of a provenance entity, the counter value that identifies the relative
107 graph entity. The integer value '1' otherwise.
108 :type identifier: int
109 :raises ValueError: if ``new_value`` is a negative integer or ``identifier`` is less than or equal to zero.
110 :return: None
111 """
112 if new_value < 0:
113 raise ValueError("new_value must be a non negative integer!")
114 self._ensure_loaded(supplier_prefix)
115 if prov_short_name == "se":
116 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
117 else:
118 file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
119 self._set_number(new_value, file_path, identifier)
121 def set_counters_batch(self, updates: Dict[Tuple[str, str], Dict[int, int]], supplier_prefix: str) -> None:
122 """
123 Updates counters in batch for multiple files.
124 `updates` is a dictionary where the key is a tuple (entity_short_name, prov_short_name)
125 and the value is a dictionary of line numbers to new counter values.
126 """
127 self._ensure_loaded(supplier_prefix)
128 for (entity_short_name, prov_short_name), file_updates in updates.items():
129 file_path = (
130 self._get_prov_path(entity_short_name, supplier_prefix)
131 if prov_short_name == "se"
132 else self._get_info_path(entity_short_name, supplier_prefix)
133 )
134 self._set_numbers(file_path, file_updates)
136 def _set_numbers(self, file_path: str, updates: Dict[int, int]) -> None:
137 """
138 Apply multiple counter updates to a single file.
139 `updates` is a dictionary where the key is the line number (identifier)
140 and the value is the new counter value.
141 """
142 if file_path not in self._cache:
143 self._cache[file_path] = [0]
144 cache_list = self._cache[file_path]
145 needed = max(updates.keys()) + 1 - len(cache_list)
146 if needed > 0:
147 cache_list.extend([0] * needed)
148 for line_number, new_value in updates.items():
149 cache_list[line_number - 1] = new_value
150 self._dirty.add(file_path)
152 def read_counter(
153 self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = ""
154 ) -> int:
155 """
156 It allows to read the counter value of graph and provenance entities.
158 :param entity_short_name: The short name associated either to the type of the entity itself
159 or, in case of a provenance entity, to the type of the relative graph entity.
160 :type entity_short_name: str
161 :param prov_short_name: In case of a provenance entity, the short name associated to the type
162 of the entity itself. An empty string otherwise.
163 :type prov_short_name: str
164 :param identifier: In case of a provenance entity, the counter value that identifies the relative
165 graph entity. The integer value '1' otherwise.
166 :type identifier: int
167 :raises ValueError: if ``identifier`` is less than or equal to zero.
168 :return: The requested counter value.
169 """
170 self._ensure_loaded(supplier_prefix)
171 if prov_short_name == "se":
172 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
173 else:
174 file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
175 return self._read_number(file_path, identifier)
177 def increment_counter(
178 self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = ""
179 ) -> int:
180 """
181 It allows to increment the counter value of graph and provenance entities by one unit.
183 :param entity_short_name: The short name associated either to the type of the entity itself
184 or, in case of a provenance entity, to the type of the relative graph entity.
185 :type entity_short_name: str
186 :param prov_short_name: In case of a provenance entity, the short name associated to the type
187 of the entity itself. An empty string otherwise.
188 :type prov_short_name: str
189 :param identifier: In case of a provenance entity, the counter value that identifies the relative
190 graph entity. The integer value '1' otherwise.
191 :type identifier: int
192 :raises ValueError: if ``identifier`` is less than or equal to zero.
193 :return: The newly-updated (already incremented) counter value.
194 """
195 self._ensure_loaded(supplier_prefix)
196 if prov_short_name == "se":
197 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
198 else:
199 file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
200 return self._add_number(file_path, identifier)
202 def _get_info_path(self, short_name: str, supplier_prefix: str) -> str:
203 return self._get_prefix_dir(supplier_prefix) + self.info_files[short_name]
205 def _get_prov_path(self, short_name: str, supplier_prefix: str) -> str:
206 return self._get_prefix_dir(supplier_prefix) + self.prov_files[short_name]
208 def _get_metadata_path(self, short_name: str, dataset_name: str) -> str:
209 return self.datasets_dir + dataset_name + os.sep + "metadata_" + short_name + ".txt"
211 def _read_number(self, file_path: str, line_number: int) -> int:
212 if line_number <= 0:
213 raise ValueError("line_number must be a positive non-zero integer number!")
214 if file_path in self._cache:
215 idx = line_number - 1
216 cache_list = self._cache[file_path]
217 if idx < len(cache_list):
218 return cache_list[idx]
219 return 0
220 self._cache[file_path] = [0]
221 return 0
223 def _add_number(self, file_path: str, line_number: int = 1) -> int:
224 if line_number <= 0:
225 raise ValueError("line_number must be a positive non-zero integer number!")
226 current_value = self._read_number(file_path, line_number)
227 new_value = current_value + 1
228 self._set_number(new_value, file_path, line_number)
229 return new_value
231 def _set_number(self, new_value: int, file_path: str, line_number: int = 1) -> None:
232 if new_value < 0:
233 raise ValueError("new_value must be a non negative integer!")
234 if line_number <= 0:
235 raise ValueError("line_number must be a positive non-zero integer number!")
236 if file_path not in self._cache:
237 self._cache[file_path] = [0]
238 cache_list = self._cache[file_path]
239 needed = line_number - len(cache_list)
240 if needed > 0:
241 cache_list.extend([0] * needed)
242 cache_list[line_number - 1] = new_value
243 self._dirty.add(file_path)
245 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None:
246 """
247 It allows to set the counter value of metadata entities.
249 :param new_value: The new counter value to be set
250 :type new_value: int
251 :param entity_short_name: The short name associated either to the type of the entity itself.
252 :type entity_short_name: str
253 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
254 :type dataset_name: str
255 :raises ValueError: if ``new_value`` is a negative integer, ``dataset_name`` is None or
256 ``entity_short_name`` is not a known metadata short name.
257 :return: None
258 """
259 if new_value < 0:
260 raise ValueError("new_value must be a non negative integer!")
261 if dataset_name is None:
262 raise ValueError("dataset_name must be provided!")
263 if entity_short_name not in self.metadata_short_names:
264 raise ValueError("entity_short_name is not a known metadata short name!")
265 file_path: str = self._get_metadata_path(entity_short_name, dataset_name)
266 return self._set_number(new_value, file_path, 1)
268 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
269 """
270 It allows to read the counter value of metadata entities.
272 :param entity_short_name: The short name associated either to the type of the entity itself.
273 :type entity_short_name: str
274 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
275 :type dataset_name: str
276 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name.
277 :return: The requested counter value.
278 """
279 if dataset_name is None:
280 raise ValueError("dataset_name must be provided!")
281 if entity_short_name not in self.metadata_short_names:
282 raise ValueError("entity_short_name is not a known metadata short name!")
283 file_path: str = self._get_metadata_path(entity_short_name, dataset_name)
284 return self._read_number(file_path, 1)
286 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
287 """
288 It allows to increment the counter value of metadata entities by one unit.
290 :param entity_short_name: The short name associated either to the type of the entity itself.
291 :type entity_short_name: str
292 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
293 :type dataset_name: str
294 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name.
295 :return: The newly-updated (already incremented) counter value.
296 """
297 if dataset_name is None:
298 raise ValueError("dataset_name must be provided!")
299 if entity_short_name not in self.metadata_short_names:
300 raise ValueError("entity_short_name is not a known metadata short name!")
301 file_path: str = self._get_metadata_path(entity_short_name, dataset_name)
302 return self._add_number(file_path, 1)