Coverage for oc_ocdm/counter_handler/redis_counter_handler.py: 77%
71 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-05 23:58 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-05 23:58 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2024, Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from typing import Dict, Optional, Tuple, Union
19import redis
20from tqdm import tqdm
22from oc_ocdm.counter_handler.counter_handler import CounterHandler
25class RedisCounterHandler(CounterHandler):
26 """A concrete implementation of the ``CounterHandler`` interface that persistently stores
27 the counter values within a Redis database."""
29 def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0, password: Optional[str] = None) -> None:
30 """
31 Constructor of the ``RedisCounterHandler`` class.
33 :param host: Redis server host
34 :type host: str
35 :param port: Redis server port
36 :type port: int
37 :param db: Redis database number
38 :type db: int
39 :param password: Redis password (if required)
40 :type password: Optional[str]
41 """
42 self.host = host
43 self.port = port
44 self.db = db
45 self.password = password
46 self.redis = redis.Redis(host=host, port=port, db=db, password=password, decode_responses=True)
48 def __getstate__(self):
49 """Support for pickle serialization."""
50 state = self.__dict__.copy()
51 # Remove Redis connection (not picklable)
52 del state['redis']
53 return state
55 def __setstate__(self, state):
56 """Support for pickle deserialization."""
57 self.__dict__.update(state)
58 # Recreate Redis connection
59 self.redis = redis.Redis(
60 host=self.host,
61 port=self.port,
62 db=self.db,
63 password=self.password,
64 decode_responses=True
65 )
67 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "",
68 identifier: int = 1, supplier_prefix: str = "") -> None:
69 """
70 It allows to set the counter value of graph and provenance entities.
72 :param new_value: The new counter value to be set
73 :type new_value: int
74 :param entity_short_name: The short name associated either to the type of the entity itself
75 or, in case of a provenance entity, to the type of the relative graph entity.
76 :type entity_short_name: str
77 :param prov_short_name: In case of a provenance entity, the short name associated to the type
78 of the entity itself. An empty string otherwise.
79 :type prov_short_name: str
80 :param identifier: In case of a provenance entity, the counter value that identifies the relative
81 graph entity. The integer value '1' otherwise.
82 :type identifier: int
83 :param supplier_prefix: The supplier prefix
84 :type supplier_prefix: str
85 :raises ValueError: if ``new_value`` is a negative integer
86 :return: None
87 """
88 if new_value < 0:
89 raise ValueError("new_value must be a non negative integer!")
91 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix)
92 self.redis.set(key, new_value)
94 def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
95 """
96 It allows to read the counter value of graph and provenance entities.
98 :param entity_short_name: The short name associated either to the type of the entity itself
99 or, in case of a provenance entity, to the type of the relative graph entity.
100 :type entity_short_name: str
101 :param prov_short_name: In case of a provenance entity, the short name associated to the type
102 of the entity itself. An empty string otherwise.
103 :type prov_short_name: str
104 :param identifier: In case of a provenance entity, the counter value that identifies the relative
105 graph entity. The integer value '1' otherwise.
106 :type identifier: int
107 :param supplier_prefix: The supplier prefix
108 :type supplier_prefix: str
109 :return: The requested counter value.
110 """
111 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix)
112 value = self.redis.get(key)
113 return int(value) if value is not None else 0
115 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
116 """
117 It allows to increment the counter value of graph and provenance entities by one unit.
119 :param entity_short_name: The short name associated either to the type of the entity itself
120 or, in case of a provenance entity, to the type of the relative graph entity.
121 :type entity_short_name: str
122 :param prov_short_name: In case of a provenance entity, the short name associated to the type
123 of the entity itself. An empty string otherwise.
124 :type prov_short_name: str
125 :param identifier: In case of a provenance entity, the counter value that identifies the relative
126 graph entity. The integer value '1' otherwise.
127 :type identifier: int
128 :param supplier_prefix: The supplier prefix
129 :type supplier_prefix: str
130 :return: The newly-updated (already incremented) counter value.
131 """
132 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix)
133 return self.redis.incr(key)
135 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None:
136 """
137 It allows to set the counter value of metadata entities.
139 :param new_value: The new counter value to be set
140 :type new_value: int
141 :param entity_short_name: The short name associated either to the type of the entity itself.
142 :type entity_short_name: str
143 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
144 :type dataset_name: str
145 :raises ValueError: if ``new_value`` is a negative integer or ``dataset_name`` is None
146 :return: None
147 """
148 if new_value < 0:
149 raise ValueError("new_value must be a non negative integer!")
151 if dataset_name is None:
152 raise ValueError("dataset_name must be provided!")
154 key = f"metadata:{dataset_name}:{entity_short_name}"
155 self.redis.set(key, new_value)
157 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
158 """
159 It allows to read the counter value of metadata entities.
161 :param entity_short_name: The short name associated either to the type of the entity itself.
162 :type entity_short_name: str
163 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
164 :type dataset_name: str
165 :raises ValueError: if ``dataset_name`` is None
166 :return: The requested counter value.
167 """
168 if dataset_name is None:
169 raise ValueError("dataset_name must be provided!")
171 key = f"metadata:{dataset_name}:{entity_short_name}"
172 value = self.redis.get(key)
173 return int(value) if value is not None else 0
175 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
176 """
177 It allows to increment the counter value of metadata entities by one unit.
179 :param entity_short_name: The short name associated either to the type of the entity itself.
180 :type entity_short_name: str
181 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
182 :type dataset_name: str
183 :raises ValueError: if ``dataset_name`` is None
184 :return: The newly-updated (already incremented) counter value.
185 """
186 if dataset_name is None:
187 raise ValueError("dataset_name must be provided!")
189 key = f"metadata:{dataset_name}:{entity_short_name}"
190 return self.redis.incr(key)
192 def _get_key(self, entity_short_name: str, prov_short_name: str = "", identifier: Union[str, int, None] = None, supplier_prefix: str = "") -> str:
193 """
194 Generate a Redis key for the given parameters.
196 :param entity_short_name: The short name associated either to the type of the entity itself
197 or, in case of a provenance entity, to the type of the relative graph entity.
198 :type entity_short_name: str
199 :param prov_short_name: In case of a provenance entity, the short name associated to the type
200 of the entity itself. An empty string otherwise.
201 :type prov_short_name: str
202 :param identifier: In case of a provenance entity, the identifier of the relative graph entity.
203 :type identifier: Union[str, int, None]
204 :param supplier_prefix: The supplier prefix
205 :type supplier_prefix: str
206 :return: The generated Redis key
207 :rtype: str
208 """
209 key_parts = [entity_short_name, supplier_prefix]
210 if prov_short_name:
211 key_parts.append(str(identifier))
212 key_parts.append(prov_short_name)
213 return ':'.join(filter(None, key_parts))
215 def batch_update_counters(self, updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]) -> None:
216 """
217 Perform batch updates of counters, processing 1 million at a time with a progress bar.
219 :param updates: A dictionary structure containing the updates.
220 The structure is as follows:
221 {
222 supplier_prefix: {
223 (short_name, prov_short_name): {
224 identifier: counter_value
225 }
226 }
227 }
228 :type updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]
229 """
230 all_updates = []
231 for supplier_prefix, value in updates.items():
232 for (short_name, prov_short_name), counters in value.items():
233 for identifier, counter_value in counters.items():
234 key = self._get_key(short_name, prov_short_name, identifier, supplier_prefix)
235 all_updates.append((key, counter_value))
237 total_updates = len(all_updates)
238 batch_size = 1_000_000
240 with tqdm(total=total_updates, desc="Updating counters") as pbar:
241 for i in range(0, total_updates, batch_size):
242 batch = all_updates[i:i+batch_size]
243 pipeline = self.redis.pipeline()
244 for key, value in batch:
245 pipeline.set(key, value)
246 pipeline.execute()
247 pbar.update(len(batch))