Coverage for oc_ocdm / counter_handler / redis_counter_handler.py: 75%
65 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7# -*- coding: utf-8 -*-
9from typing import Dict, Optional, Tuple, Union, cast
11import redis
12from tqdm import tqdm
14from oc_ocdm.counter_handler.counter_handler import CounterHandler
17class RedisCounterHandler(CounterHandler):
18 """A concrete implementation of the ``CounterHandler`` interface that persistently stores
19 the counter values within a Redis database."""
21 def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0, password: Optional[str] = None) -> None:
22 """
23 Constructor of the ``RedisCounterHandler`` class.
25 :param host: Redis server host
26 :type host: str
27 :param port: Redis server port
28 :type port: int
29 :param db: Redis database number
30 :type db: int
31 :param password: Redis password (if required)
32 :type password: Optional[str]
33 """
34 self.host = host
35 self.port = port
36 self.db = db
37 self.password = password
38 self.redis = redis.Redis(host=host, port=port, db=db, password=password, decode_responses=True)
40 def __getstate__(self):
41 """Support for pickle serialization."""
42 state = self.__dict__.copy()
43 # Remove Redis connection (not picklable)
44 del state['redis']
45 return state
47 def __setstate__(self, state: dict[str, object]) -> None:
48 """Support for pickle deserialization."""
49 vars(self).update(state)
50 # Recreate Redis connection
51 self.redis = redis.Redis(
52 host=self.host,
53 port=self.port,
54 db=self.db,
55 password=self.password,
56 decode_responses=True
57 )
59 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "",
60 identifier: int = 1, supplier_prefix: str = "") -> None:
61 """
62 It allows to set the counter value of graph and provenance entities.
64 :param new_value: The new counter value to be set
65 :type new_value: int
66 :param entity_short_name: The short name associated either to the type of the entity itself
67 or, in case of a provenance entity, to the type of the relative graph entity.
68 :type entity_short_name: str
69 :param prov_short_name: In case of a provenance entity, the short name associated to the type
70 of the entity itself. An empty string otherwise.
71 :type prov_short_name: str
72 :param identifier: In case of a provenance entity, the counter value that identifies the relative
73 graph entity. The integer value '1' otherwise.
74 :type identifier: int
75 :param supplier_prefix: The supplier prefix
76 :type supplier_prefix: str
77 :raises ValueError: if ``new_value`` is a negative integer
78 :return: None
79 """
80 if new_value < 0:
81 raise ValueError("new_value must be a non negative integer!")
83 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix)
84 self.redis.set(key, new_value)
86 def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
87 """
88 It allows to read the counter value of graph and provenance entities.
90 :param entity_short_name: The short name associated either to the type of the entity itself
91 or, in case of a provenance entity, to the type of the relative graph entity.
92 :type entity_short_name: str
93 :param prov_short_name: In case of a provenance entity, the short name associated to the type
94 of the entity itself. An empty string otherwise.
95 :type prov_short_name: str
96 :param identifier: In case of a provenance entity, the counter value that identifies the relative
97 graph entity. The integer value '1' otherwise.
98 :type identifier: int
99 :param supplier_prefix: The supplier prefix
100 :type supplier_prefix: str
101 :return: The requested counter value.
102 """
103 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix)
104 value = cast(Optional[str], self.redis.get(key))
105 return int(value) if value is not None else 0
107 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
108 """
109 It allows to increment the counter value of graph and provenance entities by one unit.
111 :param entity_short_name: The short name associated either to the type of the entity itself
112 or, in case of a provenance entity, to the type of the relative graph entity.
113 :type entity_short_name: str
114 :param prov_short_name: In case of a provenance entity, the short name associated to the type
115 of the entity itself. An empty string otherwise.
116 :type prov_short_name: str
117 :param identifier: In case of a provenance entity, the counter value that identifies the relative
118 graph entity. The integer value '1' otherwise.
119 :type identifier: int
120 :param supplier_prefix: The supplier prefix
121 :type supplier_prefix: str
122 :return: The newly-updated (already incremented) counter value.
123 """
124 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix)
125 return cast(int, self.redis.incr(key))
127 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None:
128 """
129 It allows to set the counter value of metadata entities.
131 :param new_value: The new counter value to be set
132 :type new_value: int
133 :param entity_short_name: The short name associated either to the type of the entity itself.
134 :type entity_short_name: str
135 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
136 :type dataset_name: str
137 :raises ValueError: if ``new_value`` is a negative integer
138 :return: None
139 """
140 if new_value < 0:
141 raise ValueError("new_value must be a non negative integer!")
143 key = f"metadata:{dataset_name}:{entity_short_name}"
144 self.redis.set(key, new_value)
146 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
147 """
148 It allows to read the counter value of metadata entities.
150 :param entity_short_name: The short name associated either to the type of the entity itself.
151 :type entity_short_name: str
152 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
153 :type dataset_name: str
155 :return: The requested counter value.
156 """
157 key = f"metadata:{dataset_name}:{entity_short_name}"
158 value = cast(Optional[str], self.redis.get(key))
159 return int(value) if value is not None else 0
161 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
162 """
163 It allows to increment the counter value of metadata entities by one unit.
165 :param entity_short_name: The short name associated either to the type of the entity itself.
166 :type entity_short_name: str
167 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
168 :type dataset_name: str
170 :return: The newly-updated (already incremented) counter value.
171 """
172 key = f"metadata:{dataset_name}:{entity_short_name}"
173 return cast(int, self.redis.incr(key))
175 def _get_key(self, entity_short_name: str, prov_short_name: str = "", identifier: Union[str, int, None] = None, supplier_prefix: str = "") -> str:
176 """
177 Generate a Redis key for the given parameters.
179 :param entity_short_name: The short name associated either to the type of the entity itself
180 or, in case of a provenance entity, to the type of the relative graph entity.
181 :type entity_short_name: str
182 :param prov_short_name: In case of a provenance entity, the short name associated to the type
183 of the entity itself. An empty string otherwise.
184 :type prov_short_name: str
185 :param identifier: In case of a provenance entity, the identifier of the relative graph entity.
186 :type identifier: Union[str, int, None]
187 :param supplier_prefix: The supplier prefix
188 :type supplier_prefix: str
189 :return: The generated Redis key
190 :rtype: str
191 """
192 key_parts = [entity_short_name, supplier_prefix]
193 if prov_short_name:
194 key_parts.append(str(identifier))
195 key_parts.append(prov_short_name)
196 return ':'.join(filter(None, key_parts))
198 def batch_update_counters(self, updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]) -> None:
199 """
200 Perform batch updates of counters, processing 1 million at a time with a progress bar.
202 :param updates: A dictionary structure containing the updates.
203 The structure is as follows:
204 {
205 supplier_prefix: {
206 (short_name, prov_short_name): {
207 identifier: counter_value
208 }
209 }
210 }
211 :type updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]
212 """
213 all_updates = []
214 for supplier_prefix, value in updates.items():
215 for (short_name, prov_short_name), counters in value.items():
216 for identifier, counter_value in counters.items():
217 key = self._get_key(short_name, prov_short_name, identifier, supplier_prefix)
218 all_updates.append((key, counter_value))
220 total_updates = len(all_updates)
221 batch_size = 1_000_000
223 with tqdm(total=total_updates, desc="Updating counters") as pbar:
224 for i in range(0, total_updates, batch_size):
225 batch = all_updates[i:i+batch_size]
226 pipeline = self.redis.pipeline()
227 for key, value in batch:
228 pipeline.set(key, value)
229 pipeline.execute()
230 pbar.update(len(batch))