Coverage for oc_ocdm / counter_handler / filesystem_counter_handler.py: 89%
135 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2020-2022 Simone Persiani <iosonopersia@gmail.com>
4# SPDX-FileCopyrightText: 2024 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8# -*- coding: utf-8 -*-
9from __future__ import annotations
11import os
12from shutil import copymode, move
13from tempfile import mkstemp
14from typing import TYPE_CHECKING
16if TYPE_CHECKING:
17 from typing import BinaryIO, Tuple, List, Dict
19from oc_ocdm.counter_handler.counter_handler import CounterHandler
20from oc_ocdm.support.support import is_string_empty
23class FilesystemCounterHandler(CounterHandler):
24 """A concrete implementation of the ``CounterHandler`` interface that persistently stores
25 the counter values within the filesystem."""
27 _initial_line_len: int = 3
28 _trailing_char: str = " "
30 def __init__(self, info_dir: str, supplier_prefix: str = "") -> None:
31 """
32 Constructor of the ``FilesystemCounterHandler`` class.
34 :param info_dir: The path to the folder that does/will contain the counter values.
35 :type info_dir: str
36 :raises ValueError: if ``info_dir`` is None or an empty string.
37 """
38 if info_dir is None or is_string_empty(info_dir):
39 raise ValueError("info_dir parameter is required!")
41 if info_dir[-1] != os.sep:
42 info_dir += os.sep
44 self.info_dir: str = info_dir
45 self.supplier_prefix: str = supplier_prefix
46 self.datasets_dir: str = info_dir + 'datasets' + os.sep
47 self.short_names: List[str] = ["an", "ar", "be", "br", "ci", "de", "id", "pl", "ra", "re", "rp"]
48 self.metadata_short_names: List[str] = ["di"]
49 self.info_files: Dict[str, str] = {key: ("info_file_" + key + ".txt")
50 for key in self.short_names}
51 self.prov_files: Dict[str, str] = {key: ("prov_file_" + key + ".txt")
52 for key in self.short_names}
54 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "",
55 identifier: int = 1, supplier_prefix: str = "") -> None:
56 """
57 It allows to set the counter value of graph and provenance entities.
59 :param new_value: The new counter value to be set
60 :type new_value: int
61 :param entity_short_name: The short name associated either to the type of the entity itself
62 or, in case of a provenance entity, to the type of the relative graph entity.
63 :type entity_short_name: str
64 :param prov_short_name: In case of a provenance entity, the short name associated to the type
65 of the entity itself. An empty string otherwise.
66 :type prov_short_name: str
67 :param identifier: In case of a provenance entity, the counter value that identifies the relative
68 graph entity. The integer value '1' otherwise.
69 :type identifier: int
70 :raises ValueError: if ``new_value`` is a negative integer or ``identifier`` is less than or equal to zero.
71 :return: None
72 """
73 if new_value < 0:
74 raise ValueError("new_value must be a non negative integer!")
76 if prov_short_name == "se":
77 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
78 else:
79 file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
80 self._set_number(new_value, file_path, identifier)
82 def set_counters_batch(self, updates: Dict[Tuple[str, str], Dict[int, int]], supplier_prefix: str) -> None:
83 """
84 Updates counters in batch for multiple files.
85 `updates` is a dictionary where the key is a tuple (entity_short_name, prov_short_name)
86 and the value is a dictionary of line numbers to new counter values.
87 """
88 for (entity_short_name, prov_short_name), file_updates in updates.items():
89 file_path = self._get_prov_path(entity_short_name, supplier_prefix) if prov_short_name == "se" else self._get_info_path(entity_short_name, supplier_prefix)
90 self._set_numbers(file_path, file_updates)
92 def _set_numbers(self, file_path: str, updates: Dict[int, int]) -> None:
93 """
94 Apply multiple counter updates to a single file.
95 `updates` is a dictionary where the key is the line number (identifier)
96 and the value is the new counter value.
97 """
98 self.__initialize_file_if_not_existing(file_path)
99 with open(file_path, 'r') as file:
100 lines = file.readlines()
101 max_line_number = max(updates.keys())
103 # Ensure the lines list is long enough
104 while len(lines) < max_line_number + 1:
105 lines.append("\n") # Default counter value
107 # Apply updates
108 for line_number, new_value in updates.items():
109 lines[line_number-1] = str(new_value).rstrip() + "\n"
111 # Write updated lines back to file
112 with open(file_path, 'w') as file:
113 file.writelines(lines)
115 def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
116 """
117 It allows to read the counter value of graph and provenance entities.
119 :param entity_short_name: The short name associated either to the type of the entity itself
120 or, in case of a provenance entity, to the type of the relative graph entity.
121 :type entity_short_name: str
122 :param prov_short_name: In case of a provenance entity, the short name associated to the type
123 of the entity itself. An empty string otherwise.
124 :type prov_short_name: str
125 :param identifier: In case of a provenance entity, the counter value that identifies the relative
126 graph entity. The integer value '1' otherwise.
127 :type identifier: int
128 :raises ValueError: if ``identifier`` is less than or equal to zero.
129 :return: The requested counter value.
130 """
131 if prov_short_name == "se":
132 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
133 else:
134 file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
135 return self._read_number(file_path, identifier)
137 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
138 """
139 It allows to increment the counter value of graph and provenance entities by one unit.
141 :param entity_short_name: The short name associated either to the type of the entity itself
142 or, in case of a provenance entity, to the type of the relative graph entity.
143 :type entity_short_name: str
144 :param prov_short_name: In case of a provenance entity, the short name associated to the type
145 of the entity itself. An empty string otherwise.
146 :type prov_short_name: str
147 :param identifier: In case of a provenance entity, the counter value that identifies the relative
148 graph entity. The integer value '1' otherwise.
149 :type identifier: int
150 :raises ValueError: if ``identifier`` is less than or equal to zero.
151 :return: The newly-updated (already incremented) counter value.
152 """
153 if prov_short_name == "se":
154 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
155 else:
156 file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
157 return self._add_number(file_path, identifier)
159 def _get_info_path(self, short_name: str, supplier_prefix: str) -> str:
160 supplier_prefix = "" if supplier_prefix is None else supplier_prefix
161 directory = self.info_dir if supplier_prefix == self.supplier_prefix or not self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1)
162 return directory + self.info_files[short_name]
164 def _get_prov_path(self, short_name: str, supplier_prefix: str) -> str:
165 supplier_prefix = "" if supplier_prefix is None else supplier_prefix
166 directory = self.info_dir if supplier_prefix == self.supplier_prefix or not self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1)
167 return directory + self.prov_files[short_name]
169 def _get_metadata_path(self, short_name: str, dataset_name: str) -> str:
170 return self.datasets_dir + dataset_name + os.sep + 'metadata_' + short_name + '.txt'
172 def __initialize_file_if_not_existing(self, file_path: str):
173 if not os.path.exists(os.path.dirname(file_path)):
174 os.makedirs(os.path.dirname(file_path))
176 if not os.path.isfile(file_path):
177 with open(file_path, 'w') as file:
178 file.write("\n")
180 def _read_number(self, file_path: str, line_number: int) -> int:
181 if line_number <= 0:
182 raise ValueError("line_number must be a positive non-zero integer number!")
184 self.__initialize_file_if_not_existing(file_path)
186 cur_number: int = 0
187 try:
188 with open(file_path, 'r') as file:
189 for i, line in enumerate(file, 1):
190 if i == line_number:
191 line = line.strip()
192 if line:
193 cur_number = int(line)
194 break
195 else:
196 print(file_path)
197 except ValueError as e:
198 print(f"ValueError: {e}")
199 cur_number = 0
200 except Exception as e:
201 print(f"Unexpected error: {e}")
202 return cur_number
204 def _add_number(self, file_path: str, line_number: int = 1) -> int:
205 if line_number <= 0:
206 raise ValueError("line_number must be a positive non-zero integer number!")
208 self.__initialize_file_if_not_existing(file_path)
210 current_value = self._read_number(file_path, line_number)
211 new_value = current_value + 1
212 self._set_number(new_value, file_path, line_number)
213 return new_value
215 def _set_number(self, new_value: int, file_path: str, line_number: int = 1) -> None:
216 if new_value < 0:
217 raise ValueError("new_value must be a non negative integer!")
219 if line_number <= 0:
220 raise ValueError("line_number must be a positive non-zero integer number!")
222 self.__initialize_file_if_not_existing(file_path)
224 lines = []
225 with open(file_path, 'r') as file:
226 lines = file.readlines()
228 # Ensure the file has enough lines
229 while len(lines) < line_number:
230 lines.append("\n")
232 # Update the specific line
233 lines[line_number - 1] = f"{new_value}\n"
235 # Write back to the file
236 with open(file_path, 'w') as file:
237 file.writelines(lines)
240 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None:
241 """
242 It allows to set the counter value of metadata entities.
244 :param new_value: The new counter value to be set
245 :type new_value: int
246 :param entity_short_name: The short name associated either to the type of the entity itself.
247 :type entity_short_name: str
248 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
249 :type dataset_name: str
250 :raises ValueError: if ``new_value`` is a negative integer, ``dataset_name`` is None or
251 ``entity_short_name`` is not a known metadata short name.
252 :return: None
253 """
254 if new_value < 0:
255 raise ValueError("new_value must be a non negative integer!")
257 if dataset_name is None:
258 raise ValueError("dataset_name must be provided!")
260 if entity_short_name not in self.metadata_short_names:
261 raise ValueError("entity_short_name is not a known metadata short name!")
263 file_path: str = self._get_metadata_path(entity_short_name, dataset_name)
264 return self._set_number(new_value, file_path, 1)
266 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
267 """
268 It allows to read the counter value of metadata entities.
270 :param entity_short_name: The short name associated either to the type of the entity itself.
271 :type entity_short_name: str
272 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
273 :type dataset_name: str
274 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name.
275 :return: The requested counter value.
276 """
277 if dataset_name is None:
278 raise ValueError("dataset_name must be provided!")
280 if entity_short_name not in self.metadata_short_names:
281 raise ValueError("entity_short_name is not a known metadata short name!")
283 file_path: str = self._get_metadata_path(entity_short_name, dataset_name)
284 return self._read_number(file_path, 1)
286 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
287 """
288 It allows to increment the counter value of metadata entities by one unit.
290 :param entity_short_name: The short name associated either to the type of the entity itself.
291 :type entity_short_name: str
292 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
293 :type dataset_name: str
294 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name.
295 :return: The newly-updated (already incremented) counter value.
296 """
297 if dataset_name is None:
298 raise ValueError("dataset_name must be provided!")
300 if entity_short_name not in self.metadata_short_names:
301 raise ValueError("entity_short_name is not a known metadata short name!")
303 file_path: str = self._get_metadata_path(entity_short_name, dataset_name)
304 return self._add_number(file_path, 1)