Coverage for oc_ocdm/counter_handler/redis_counter_handler.py: 73%
60 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2024, Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from typing import Dict, Optional, Tuple, Union
19import redis
20from tqdm import tqdm
22from oc_ocdm.counter_handler.counter_handler import CounterHandler
25class RedisCounterHandler(CounterHandler):
26 """A concrete implementation of the ``CounterHandler`` interface that persistently stores
27 the counter values within a Redis database."""
29 def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0, password: Optional[str] = None) -> None:
30 """
31 Constructor of the ``RedisCounterHandler`` class.
33 :param host: Redis server host
34 :type host: str
35 :param port: Redis server port
36 :type port: int
37 :param db: Redis database number
38 :type db: int
39 :param password: Redis password (if required)
40 :type password: Optional[str]
41 """
42 self.redis = redis.Redis(host=host, port=port, db=db, password=password, decode_responses=True)
44 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "",
45 identifier: int = 1, supplier_prefix: str = "") -> None:
46 """
47 It allows to set the counter value of graph and provenance entities.
49 :param new_value: The new counter value to be set
50 :type new_value: int
51 :param entity_short_name: The short name associated either to the type of the entity itself
52 or, in case of a provenance entity, to the type of the relative graph entity.
53 :type entity_short_name: str
54 :param prov_short_name: In case of a provenance entity, the short name associated to the type
55 of the entity itself. An empty string otherwise.
56 :type prov_short_name: str
57 :param identifier: In case of a provenance entity, the counter value that identifies the relative
58 graph entity. The integer value '1' otherwise.
59 :type identifier: int
60 :param supplier_prefix: The supplier prefix
61 :type supplier_prefix: str
62 :raises ValueError: if ``new_value`` is a negative integer
63 :return: None
64 """
65 if new_value < 0:
66 raise ValueError("new_value must be a non negative integer!")
68 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix)
69 self.redis.set(key, new_value)
71 def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
72 """
73 It allows to read the counter value of graph and provenance entities.
75 :param entity_short_name: The short name associated either to the type of the entity itself
76 or, in case of a provenance entity, to the type of the relative graph entity.
77 :type entity_short_name: str
78 :param prov_short_name: In case of a provenance entity, the short name associated to the type
79 of the entity itself. An empty string otherwise.
80 :type prov_short_name: str
81 :param identifier: In case of a provenance entity, the counter value that identifies the relative
82 graph entity. The integer value '1' otherwise.
83 :type identifier: int
84 :param supplier_prefix: The supplier prefix
85 :type supplier_prefix: str
86 :return: The requested counter value.
87 """
88 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix)
89 value = self.redis.get(key)
90 return int(value) if value is not None else 0
92 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
93 """
94 It allows to increment the counter value of graph and provenance entities by one unit.
96 :param entity_short_name: The short name associated either to the type of the entity itself
97 or, in case of a provenance entity, to the type of the relative graph entity.
98 :type entity_short_name: str
99 :param prov_short_name: In case of a provenance entity, the short name associated to the type
100 of the entity itself. An empty string otherwise.
101 :type prov_short_name: str
102 :param identifier: In case of a provenance entity, the counter value that identifies the relative
103 graph entity. The integer value '1' otherwise.
104 :type identifier: int
105 :param supplier_prefix: The supplier prefix
106 :type supplier_prefix: str
107 :return: The newly-updated (already incremented) counter value.
108 """
109 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix)
110 return self.redis.incr(key)
112 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None:
113 """
114 It allows to set the counter value of metadata entities.
116 :param new_value: The new counter value to be set
117 :type new_value: int
118 :param entity_short_name: The short name associated either to the type of the entity itself.
119 :type entity_short_name: str
120 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
121 :type dataset_name: str
122 :raises ValueError: if ``new_value`` is a negative integer or ``dataset_name`` is None
123 :return: None
124 """
125 if new_value < 0:
126 raise ValueError("new_value must be a non negative integer!")
128 if dataset_name is None:
129 raise ValueError("dataset_name must be provided!")
131 key = f"metadata:{dataset_name}:{entity_short_name}"
132 self.redis.set(key, new_value)
134 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
135 """
136 It allows to read the counter value of metadata entities.
138 :param entity_short_name: The short name associated either to the type of the entity itself.
139 :type entity_short_name: str
140 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
141 :type dataset_name: str
142 :raises ValueError: if ``dataset_name`` is None
143 :return: The requested counter value.
144 """
145 if dataset_name is None:
146 raise ValueError("dataset_name must be provided!")
148 key = f"metadata:{dataset_name}:{entity_short_name}"
149 value = self.redis.get(key)
150 return int(value) if value is not None else 0
152 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int:
153 """
154 It allows to increment the counter value of metadata entities by one unit.
156 :param entity_short_name: The short name associated either to the type of the entity itself.
157 :type entity_short_name: str
158 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset.
159 :type dataset_name: str
160 :raises ValueError: if ``dataset_name`` is None
161 :return: The newly-updated (already incremented) counter value.
162 """
163 if dataset_name is None:
164 raise ValueError("dataset_name must be provided!")
166 key = f"metadata:{dataset_name}:{entity_short_name}"
167 return self.redis.incr(key)
169 def _get_key(self, entity_short_name: str, prov_short_name: str = "", identifier: Union[str, int, None] = None, supplier_prefix: str = "") -> str:
170 """
171 Generate a Redis key for the given parameters.
173 :param entity_short_name: The short name associated either to the type of the entity itself
174 or, in case of a provenance entity, to the type of the relative graph entity.
175 :type entity_short_name: str
176 :param prov_short_name: In case of a provenance entity, the short name associated to the type
177 of the entity itself. An empty string otherwise.
178 :type prov_short_name: str
179 :param identifier: In case of a provenance entity, the identifier of the relative graph entity.
180 :type identifier: Union[str, int, None]
181 :param supplier_prefix: The supplier prefix
182 :type supplier_prefix: str
183 :return: The generated Redis key
184 :rtype: str
185 """
186 key_parts = [entity_short_name, supplier_prefix]
187 if prov_short_name:
188 key_parts.append(str(identifier))
189 key_parts.append(prov_short_name)
190 return ':'.join(filter(None, key_parts))
192 def batch_update_counters(self, updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]) -> None:
193 """
194 Perform batch updates of counters, processing 1 million at a time with a progress bar.
196 :param updates: A dictionary structure containing the updates.
197 The structure is as follows:
198 {
199 supplier_prefix: {
200 (short_name, prov_short_name): {
201 identifier: counter_value
202 }
203 }
204 }
205 :type updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]
206 """
207 all_updates = []
208 for supplier_prefix, value in updates.items():
209 for (short_name, prov_short_name), counters in value.items():
210 for identifier, counter_value in counters.items():
211 key = self._get_key(short_name, prov_short_name, identifier, supplier_prefix)
212 all_updates.append((key, counter_value))
214 total_updates = len(all_updates)
215 batch_size = 1_000_000
217 with tqdm(total=total_updates, desc="Updating counters") as pbar:
218 for i in range(0, total_updates, batch_size):
219 batch = all_updates[i:i+batch_size]
220 pipeline = self.redis.pipeline()
221 for key, value in batch:
222 pipeline.set(key, value)
223 pipeline.execute()
224 pbar.update(len(batch))