Coverage for oc_ocdm/counter_handler/filesystem_counter_handler.py: 88%
137 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
16from __future__ import annotations
18import os
19from shutil import copymode, move
20from tempfile import mkstemp
21from typing import TYPE_CHECKING
23if TYPE_CHECKING:
24 from typing import BinaryIO, Tuple, List, Dict
26from oc_ocdm.counter_handler.counter_handler import CounterHandler
27from oc_ocdm.support.support import is_string_empty
30class FilesystemCounterHandler(CounterHandler):
31 """A concrete implementation of the ``CounterHandler`` interface that persistently stores
32 the counter values within the filesystem."""
34 _initial_line_len: int = 3
35 _trailing_char: str = " "
37 def __init__(self, info_dir: str, supplier_prefix: str = "") -> None:
38 """
39 Constructor of the ``FilesystemCounterHandler`` class.
41 :param info_dir: The path to the folder that does/will contain the counter values.
42 :type info_dir: str
43 :raises ValueError: if ``info_dir`` is None or an empty string.
44 """
45 if info_dir is None or is_string_empty(info_dir):
46 raise ValueError("info_dir parameter is required!")
48 if info_dir[-1] != os.sep:
49 info_dir += os.sep
51 self.info_dir: str = info_dir
52 self.supplier_prefix: str = supplier_prefix
53 self.datasets_dir: str = info_dir + 'datasets' + os.sep
54 self.short_names: List[str] = ["an", "ar", "be", "br", "ci", "de", "id", "pl", "ra", "re", "rp"]
55 self.metadata_short_names: List[str] = ["di"]
56 self.info_files: Dict[str, str] = {key: ("info_file_" + key + ".txt")
57 for key in self.short_names}
58 self.prov_files: Dict[str, str] = {key: ("prov_file_" + key + ".txt")
59 for key in self.short_names}
61 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "",
62 identifier: int = 1, supplier_prefix: str = "") -> None:
63 """
64 It allows to set the counter value of graph and provenance entities.
66 :param new_value: The new counter value to be set
67 :type new_value: int
68 :param entity_short_name: The short name associated either to the type of the entity itself
69 or, in case of a provenance entity, to the type of the relative graph entity.
70 :type entity_short_name: str
71 :param prov_short_name: In case of a provenance entity, the short name associated to the type
72 of the entity itself. An empty string otherwise.
73 :type prov_short_name: str
74 :param identifier: In case of a provenance entity, the counter value that identifies the relative
75 graph entity. The integer value '1' otherwise.
76 :type identifier: int
77 :raises ValueError: if ``new_value`` is a negative integer or ``identifier`` is less than or equal to zero.
78 :return: None
79 """
80 if new_value < 0:
81 raise ValueError("new_value must be a non negative integer!")
83 if prov_short_name == "se":
84 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
85 else:
86 file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
87 self._set_number(new_value, file_path, identifier)
89 def set_counters_batch(self, updates: Dict[Tuple[str, str], Dict[int, int]], supplier_prefix: str) -> None:
90 """
91 Updates counters in batch for multiple files.
92 `updates` is a dictionary where the key is a tuple (entity_short_name, prov_short_name)
93 and the value is a dictionary of line numbers to new counter values.
94 """
95 for (entity_short_name, prov_short_name), file_updates in updates.items():
96 file_path = self._get_prov_path(entity_short_name, supplier_prefix) if prov_short_name == "se" else self._get_info_path(entity_short_name, supplier_prefix)
97 self._set_numbers(file_path, file_updates)
99 def _set_numbers(self, file_path: str, updates: Dict[int, int]) -> None:
100 """
101 Apply multiple counter updates to a single file.
102 `updates` is a dictionary where the key is the line number (identifier)
103 and the value is the new counter value.
104 """
105 self.__initialize_file_if_not_existing(file_path)
106 with open(file_path, 'r') as file:
107 lines = file.readlines()
108 max_line_number = max(updates.keys())
110 # Ensure the lines list is long enough
111 while len(lines) < max_line_number + 1:
112 lines.append("\n") # Default counter value
114 # Apply updates
115 for line_number, new_value in updates.items():
116 lines[line_number-1] = str(new_value).rstrip() + "\n"
118 # Write updated lines back to file
119 with open(file_path, 'w') as file:
120 file.writelines(lines)
122 def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
123 """
124 It allows to read the counter value of graph and provenance entities.
126 :param entity_short_name: The short name associated either to the type of the entity itself
127 or, in case of a provenance entity, to the type of the relative graph entity.
128 :type entity_short_name: str
129 :param prov_short_name: In case of a provenance entity, the short name associated to the type
130 of the entity itself. An empty string otherwise.
131 :type prov_short_name: str
132 :param identifier: In case of a provenance entity, the counter value that identifies the relative
133 graph entity. The integer value '1' otherwise.
134 :type identifier: int
135 :raises ValueError: if ``identifier`` is less than or equal to zero.
136 :return: The requested counter value.
137 """
138 if prov_short_name == "se":
139 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
140 else:
141 file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
142 return self._read_number(file_path, identifier)
144 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
145 """
146 It allows to increment the counter value of graph and provenance entities by one unit.
148 :param entity_short_name: The short name associated either to the type of the entity itself
149 or, in case of a provenance entity, to the type of the relative graph entity.
150 :type entity_short_name: str
151 :param prov_short_name: In case of a provenance entity, the short name associated to the type
152 of the entity itself. An empty string otherwise.
153 :type prov_short_name: str
154 :param identifier: In case of a provenance entity, the counter value that identifies the relative
155 graph entity. The integer value '1' otherwise.
156 :type identifier: int
157 :raises ValueError: if ``identifier`` is less than or equal to zero.
158 :return: The newly-updated (already incremented) counter value.
159 """
160 if prov_short_name == "se":
161 file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
162 else:
163 file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
164 return self._add_number(file_path, identifier)
166 def _get_info_path(self, short_name: str, supplier_prefix: str) -> str:
167 supplier_prefix = "" if supplier_prefix is None else supplier_prefix
168 directory = self.info_dir if supplier_prefix == self.supplier_prefix or not self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1)
169 return directory + self.info_files[short_name]
171 def _get_prov_path(self, short_name: str, supplier_prefix: str) -> str:
172 supplier_prefix = "" if supplier_prefix is None else supplier_prefix
173 directory = self.info_dir if supplier_prefix == self.supplier_prefix or not self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1)
174 return directory + self.prov_files[short_name]
176 def _get_metadata_path(self, short_name: str, dataset_name: str) -> str:
177 return self.datasets_dir + dataset_name + os.sep + 'metadata_' + short_name + '.txt'
179 def __initialize_file_if_not_existing(self, file_path: str):
180 if not os.path.exists(os.path.dirname(file_path)):
181 os.makedirs(os.path.dirname(file_path))
183 if not os.path.isfile(file_path):
184 with open(file_path, 'w') as file:
185 file.write("\n")
187 def _read_number(self, file_path: str, line_number: int) -> int:
188 if line_number <= 0:
189 raise ValueError("line_number must be a positive non-zero integer number!")
191 self.__initialize_file_if_not_existing(file_path)
193 cur_number: int = 0
194 try:
195 with open(file_path, 'r') as file:
196 for i, line in enumerate(file, 1):
197 if i == line_number:
198 line = line.strip()
199 if line:
200 cur_number = int(line)
201 break
202 else:
203 print(file_path)
204 except ValueError as e:
205 print(f"ValueError: {e}")
206 cur_number = 0
207 except Exception as e:
208 print(f"Unexpected error: {e}")
209 return cur_number
211 def _add_number(self, file_path: str, line_number: int = 1) -> int:
212 if line_number <= 0:
213 raise ValueError("line_number must be a positive non-zero integer number!")
215 self.__initialize_file_if_not_existing(file_path)
217 current_value = self._read_number(file_path, line_number)
218 new_value = current_value + 1
219 self._set_number(new_value, file_path, line_number)
220 return new_value
222 def _set_number(self, new_value: int, file_path: str, line_number: int = 1) -> None:
223 if new_value < 0:
224 raise ValueError("new_value must be a non negative integer!")
226 if line_number <= 0:
227 raise ValueError("line_number must be a positive non-zero integer number!")
229 self.__initialize_file_if_not_existing(file_path)
231 lines = []
232 with open(file_path, 'r') as file:
233 lines = file.readlines()
235 # Ensure the file has enough lines
236 while len(lines) < line_number:
237 lines.append("\n")
239 # Update the specific line
240 lines[line_number - 1] = f"{new_value}\n"
242 # Write back to the file
243 with open(file_path, 'w') as file:
244 file.writelines(lines)
247 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None:
248 """
249 It allows to set the counter value of metadata entities.
251 :param new_value: The new counter value to be set
252 :type new_value: int
253 :param entity_short_name: The short name associated either to the type of the entity itself.
254 :type entity_short_name: str
255 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
256 :type dataset_name: str
257 :raises ValueError: if ``new_value`` is a negative integer, ``dataset_name`` is None or
258 ``entity_short_name`` is not a known metadata short name.
259 :return: None
260 """
261 if new_value < 0:
262 raise ValueError("new_value must be a non negative integer!")
264 if dataset_name is None:
265 raise ValueError("dataset_name must be provided!")
267 if entity_short_name not in self.metadata_short_names:
268 raise ValueError("entity_short_name is not a known metadata short name!")
270 file_path: str = self._get_metadata_path(entity_short_name, dataset_name)
271 return self._set_number(new_value, file_path, 1)
273 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
274 """
275 It allows to read the counter value of metadata entities.
277 :param entity_short_name: The short name associated either to the type of the entity itself.
278 :type entity_short_name: str
279 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
280 :type dataset_name: str
281 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name.
282 :return: The requested counter value.
283 """
284 if dataset_name is None:
285 raise ValueError("dataset_name must be provided!")
287 if entity_short_name not in self.metadata_short_names:
288 raise ValueError("entity_short_name is not a known metadata short name!")
290 file_path: str = self._get_metadata_path(entity_short_name, dataset_name)
291 return self._read_number(file_path, 1)
293 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
294 """
295 It allows to increment the counter value of metadata entities by one unit.
297 :param entity_short_name: The short name associated either to the type of the entity itself.
298 :type entity_short_name: str
299 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
300 :type dataset_name: str
301 :raises ValueError: if ``dataset_name`` is None or ``entity_short_name`` is not a known metadata short name.
302 :return: The newly-updated (already incremented) counter value.
303 """
304 if dataset_name is None:
305 raise ValueError("dataset_name must be provided!")
307 if entity_short_name not in self.metadata_short_names:
308 raise ValueError("entity_short_name is not a known metadata short name!")
310 file_path: str = self._get_metadata_path(entity_short_name, dataset_name)
311 return self._add_number(file_path, 1)