Coverage for oc_ocdm/counter_handler/redis_counter_handler.py: 77%

71 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-05 23:58 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2024, Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from typing import Dict, Optional, Tuple, Union 

18 

19import redis 

20from tqdm import tqdm 

21 

22from oc_ocdm.counter_handler.counter_handler import CounterHandler 

23 

24 

25class RedisCounterHandler(CounterHandler): 

26 """A concrete implementation of the ``CounterHandler`` interface that persistently stores 

27 the counter values within a Redis database.""" 

28 

29 def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0, password: Optional[str] = None) -> None: 

30 """ 

31 Constructor of the ``RedisCounterHandler`` class. 

32 

33 :param host: Redis server host 

34 :type host: str 

35 :param port: Redis server port 

36 :type port: int 

37 :param db: Redis database number 

38 :type db: int 

39 :param password: Redis password (if required) 

40 :type password: Optional[str] 

41 """ 

42 self.host = host 

43 self.port = port 

44 self.db = db 

45 self.password = password 

46 self.redis = redis.Redis(host=host, port=port, db=db, password=password, decode_responses=True) 

47 

48 def __getstate__(self): 

49 """Support for pickle serialization.""" 

50 state = self.__dict__.copy() 

51 # Remove Redis connection (not picklable) 

52 del state['redis'] 

53 return state 

54 

55 def __setstate__(self, state): 

56 """Support for pickle deserialization.""" 

57 self.__dict__.update(state) 

58 # Recreate Redis connection 

59 self.redis = redis.Redis( 

60 host=self.host, 

61 port=self.port, 

62 db=self.db, 

63 password=self.password, 

64 decode_responses=True 

65 ) 

66 

67 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "", 

68 identifier: int = 1, supplier_prefix: str = "") -> None: 

69 """ 

70 It allows to set the counter value of graph and provenance entities. 

71 

72 :param new_value: The new counter value to be set 

73 :type new_value: int 

74 :param entity_short_name: The short name associated either to the type of the entity itself 

75 or, in case of a provenance entity, to the type of the relative graph entity. 

76 :type entity_short_name: str 

77 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

78 of the entity itself. An empty string otherwise. 

79 :type prov_short_name: str 

80 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

81 graph entity. The integer value '1' otherwise. 

82 :type identifier: int 

83 :param supplier_prefix: The supplier prefix 

84 :type supplier_prefix: str 

85 :raises ValueError: if ``new_value`` is a negative integer 

86 :return: None 

87 """ 

88 if new_value < 0: 

89 raise ValueError("new_value must be a non negative integer!") 

90 

91 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix) 

92 self.redis.set(key, new_value) 

93 

94 def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int: 

95 """ 

96 It allows to read the counter value of graph and provenance entities. 

97 

98 :param entity_short_name: The short name associated either to the type of the entity itself 

99 or, in case of a provenance entity, to the type of the relative graph entity. 

100 :type entity_short_name: str 

101 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

102 of the entity itself. An empty string otherwise. 

103 :type prov_short_name: str 

104 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

105 graph entity. The integer value '1' otherwise. 

106 :type identifier: int 

107 :param supplier_prefix: The supplier prefix 

108 :type supplier_prefix: str 

109 :return: The requested counter value. 

110 """ 

111 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix) 

112 value = self.redis.get(key) 

113 return int(value) if value is not None else 0 

114 

115 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int: 

116 """ 

117 It allows to increment the counter value of graph and provenance entities by one unit. 

118 

119 :param entity_short_name: The short name associated either to the type of the entity itself 

120 or, in case of a provenance entity, to the type of the relative graph entity. 

121 :type entity_short_name: str 

122 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

123 of the entity itself. An empty string otherwise. 

124 :type prov_short_name: str 

125 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

126 graph entity. The integer value '1' otherwise. 

127 :type identifier: int 

128 :param supplier_prefix: The supplier prefix 

129 :type supplier_prefix: str 

130 :return: The newly-updated (already incremented) counter value. 

131 """ 

132 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix) 

133 return self.redis.incr(key) 

134 

135 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None: 

136 """ 

137 It allows to set the counter value of metadata entities. 

138 

139 :param new_value: The new counter value to be set 

140 :type new_value: int 

141 :param entity_short_name: The short name associated either to the type of the entity itself. 

142 :type entity_short_name: str 

143 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

144 :type dataset_name: str 

145 :raises ValueError: if ``new_value`` is a negative integer or ``dataset_name`` is None 

146 :return: None 

147 """ 

148 if new_value < 0: 

149 raise ValueError("new_value must be a non negative integer!") 

150 

151 if dataset_name is None: 

152 raise ValueError("dataset_name must be provided!") 

153 

154 key = f"metadata:{dataset_name}:{entity_short_name}" 

155 self.redis.set(key, new_value) 

156 

157 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

158 """ 

159 It allows to read the counter value of metadata entities. 

160 

161 :param entity_short_name: The short name associated either to the type of the entity itself. 

162 :type entity_short_name: str 

163 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

164 :type dataset_name: str 

165 :raises ValueError: if ``dataset_name`` is None 

166 :return: The requested counter value. 

167 """ 

168 if dataset_name is None: 

169 raise ValueError("dataset_name must be provided!") 

170 

171 key = f"metadata:{dataset_name}:{entity_short_name}" 

172 value = self.redis.get(key) 

173 return int(value) if value is not None else 0 

174 

175 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

176 """ 

177 It allows to increment the counter value of metadata entities by one unit. 

178 

179 :param entity_short_name: The short name associated either to the type of the entity itself. 

180 :type entity_short_name: str 

181 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

182 :type dataset_name: str 

183 :raises ValueError: if ``dataset_name`` is None 

184 :return: The newly-updated (already incremented) counter value. 

185 """ 

186 if dataset_name is None: 

187 raise ValueError("dataset_name must be provided!") 

188 

189 key = f"metadata:{dataset_name}:{entity_short_name}" 

190 return self.redis.incr(key) 

191 

192 def _get_key(self, entity_short_name: str, prov_short_name: str = "", identifier: Union[str, int, None] = None, supplier_prefix: str = "") -> str: 

193 """ 

194 Generate a Redis key for the given parameters. 

195 

196 :param entity_short_name: The short name associated either to the type of the entity itself 

197 or, in case of a provenance entity, to the type of the relative graph entity. 

198 :type entity_short_name: str 

199 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

200 of the entity itself. An empty string otherwise. 

201 :type prov_short_name: str 

202 :param identifier: In case of a provenance entity, the identifier of the relative graph entity. 

203 :type identifier: Union[str, int, None] 

204 :param supplier_prefix: The supplier prefix 

205 :type supplier_prefix: str 

206 :return: The generated Redis key 

207 :rtype: str 

208 """ 

209 key_parts = [entity_short_name, supplier_prefix] 

210 if prov_short_name: 

211 key_parts.append(str(identifier)) 

212 key_parts.append(prov_short_name) 

213 return ':'.join(filter(None, key_parts)) 

214 

215 def batch_update_counters(self, updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]) -> None: 

216 """ 

217 Perform batch updates of counters, processing 1 million at a time with a progress bar. 

218 

219 :param updates: A dictionary structure containing the updates. 

220 The structure is as follows: 

221 { 

222 supplier_prefix: { 

223 (short_name, prov_short_name): { 

224 identifier: counter_value 

225 } 

226 } 

227 } 

228 :type updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]] 

229 """ 

230 all_updates = [] 

231 for supplier_prefix, value in updates.items(): 

232 for (short_name, prov_short_name), counters in value.items(): 

233 for identifier, counter_value in counters.items(): 

234 key = self._get_key(short_name, prov_short_name, identifier, supplier_prefix) 

235 all_updates.append((key, counter_value)) 

236 

237 total_updates = len(all_updates) 

238 batch_size = 1_000_000 

239 

240 with tqdm(total=total_updates, desc="Updating counters") as pbar: 

241 for i in range(0, total_updates, batch_size): 

242 batch = all_updates[i:i+batch_size] 

243 pipeline = self.redis.pipeline() 

244 for key, value in batch: 

245 pipeline.set(key, value) 

246 pipeline.execute() 

247 pbar.update(len(batch)) 

248