Coverage for oc_ocdm / counter_handler / redis_counter_handler.py: 75%

65 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-28 18:52 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7# -*- coding: utf-8 -*- 

8 

9from typing import Dict, Optional, Tuple, Union, cast 

10 

11import redis 

12from tqdm import tqdm 

13 

14from oc_ocdm.counter_handler.counter_handler import CounterHandler 

15 

16 

17class RedisCounterHandler(CounterHandler): 

18 """A concrete implementation of the ``CounterHandler`` interface that persistently stores 

19 the counter values within a Redis database.""" 

20 

21 def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0, password: Optional[str] = None) -> None: 

22 """ 

23 Constructor of the ``RedisCounterHandler`` class. 

24 

25 :param host: Redis server host 

26 :type host: str 

27 :param port: Redis server port 

28 :type port: int 

29 :param db: Redis database number 

30 :type db: int 

31 :param password: Redis password (if required) 

32 :type password: Optional[str] 

33 """ 

34 self.host = host 

35 self.port = port 

36 self.db = db 

37 self.password = password 

38 self.redis = redis.Redis(host=host, port=port, db=db, password=password, decode_responses=True) 

39 

40 def __getstate__(self): 

41 """Support for pickle serialization.""" 

42 state = self.__dict__.copy() 

43 # Remove Redis connection (not picklable) 

44 del state['redis'] 

45 return state 

46 

47 def __setstate__(self, state: dict[str, object]) -> None: 

48 """Support for pickle deserialization.""" 

49 vars(self).update(state) 

50 # Recreate Redis connection 

51 self.redis = redis.Redis( 

52 host=self.host, 

53 port=self.port, 

54 db=self.db, 

55 password=self.password, 

56 decode_responses=True 

57 ) 

58 

59 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "", 

60 identifier: int = 1, supplier_prefix: str = "") -> None: 

61 """ 

62 It allows to set the counter value of graph and provenance entities. 

63 

64 :param new_value: The new counter value to be set 

65 :type new_value: int 

66 :param entity_short_name: The short name associated either to the type of the entity itself 

67 or, in case of a provenance entity, to the type of the relative graph entity. 

68 :type entity_short_name: str 

69 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

70 of the entity itself. An empty string otherwise. 

71 :type prov_short_name: str 

72 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

73 graph entity. The integer value '1' otherwise. 

74 :type identifier: int 

75 :param supplier_prefix: The supplier prefix 

76 :type supplier_prefix: str 

77 :raises ValueError: if ``new_value`` is a negative integer 

78 :return: None 

79 """ 

80 if new_value < 0: 

81 raise ValueError("new_value must be a non negative integer!") 

82 

83 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix) 

84 self.redis.set(key, new_value) 

85 

86 def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int: 

87 """ 

88 It allows to read the counter value of graph and provenance entities. 

89 

90 :param entity_short_name: The short name associated either to the type of the entity itself 

91 or, in case of a provenance entity, to the type of the relative graph entity. 

92 :type entity_short_name: str 

93 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

94 of the entity itself. An empty string otherwise. 

95 :type prov_short_name: str 

96 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

97 graph entity. The integer value '1' otherwise. 

98 :type identifier: int 

99 :param supplier_prefix: The supplier prefix 

100 :type supplier_prefix: str 

101 :return: The requested counter value. 

102 """ 

103 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix) 

104 value = cast(Optional[str], self.redis.get(key)) 

105 return int(value) if value is not None else 0 

106 

107 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int: 

108 """ 

109 It allows to increment the counter value of graph and provenance entities by one unit. 

110 

111 :param entity_short_name: The short name associated either to the type of the entity itself 

112 or, in case of a provenance entity, to the type of the relative graph entity. 

113 :type entity_short_name: str 

114 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

115 of the entity itself. An empty string otherwise. 

116 :type prov_short_name: str 

117 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

118 graph entity. The integer value '1' otherwise. 

119 :type identifier: int 

120 :param supplier_prefix: The supplier prefix 

121 :type supplier_prefix: str 

122 :return: The newly-updated (already incremented) counter value. 

123 """ 

124 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix) 

125 return cast(int, self.redis.incr(key)) 

126 

127 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None: 

128 """ 

129 It allows to set the counter value of metadata entities. 

130 

131 :param new_value: The new counter value to be set 

132 :type new_value: int 

133 :param entity_short_name: The short name associated either to the type of the entity itself. 

134 :type entity_short_name: str 

135 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

136 :type dataset_name: str 

137 :raises ValueError: if ``new_value`` is a negative integer 

138 :return: None 

139 """ 

140 if new_value < 0: 

141 raise ValueError("new_value must be a non negative integer!") 

142 

143 key = f"metadata:{dataset_name}:{entity_short_name}" 

144 self.redis.set(key, new_value) 

145 

146 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

147 """ 

148 It allows to read the counter value of metadata entities. 

149 

150 :param entity_short_name: The short name associated either to the type of the entity itself. 

151 :type entity_short_name: str 

152 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

153 :type dataset_name: str 

154 

155 :return: The requested counter value. 

156 """ 

157 key = f"metadata:{dataset_name}:{entity_short_name}" 

158 value = cast(Optional[str], self.redis.get(key)) 

159 return int(value) if value is not None else 0 

160 

161 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

162 """ 

163 It allows to increment the counter value of metadata entities by one unit. 

164 

165 :param entity_short_name: The short name associated either to the type of the entity itself. 

166 :type entity_short_name: str 

167 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

168 :type dataset_name: str 

169 

170 :return: The newly-updated (already incremented) counter value. 

171 """ 

172 key = f"metadata:{dataset_name}:{entity_short_name}" 

173 return cast(int, self.redis.incr(key)) 

174 

175 def _get_key(self, entity_short_name: str, prov_short_name: str = "", identifier: Union[str, int, None] = None, supplier_prefix: str = "") -> str: 

176 """ 

177 Generate a Redis key for the given parameters. 

178 

179 :param entity_short_name: The short name associated either to the type of the entity itself 

180 or, in case of a provenance entity, to the type of the relative graph entity. 

181 :type entity_short_name: str 

182 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

183 of the entity itself. An empty string otherwise. 

184 :type prov_short_name: str 

185 :param identifier: In case of a provenance entity, the identifier of the relative graph entity. 

186 :type identifier: Union[str, int, None] 

187 :param supplier_prefix: The supplier prefix 

188 :type supplier_prefix: str 

189 :return: The generated Redis key 

190 :rtype: str 

191 """ 

192 key_parts = [entity_short_name, supplier_prefix] 

193 if prov_short_name: 

194 key_parts.append(str(identifier)) 

195 key_parts.append(prov_short_name) 

196 return ':'.join(filter(None, key_parts)) 

197 

198 def batch_update_counters(self, updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]) -> None: 

199 """ 

200 Perform batch updates of counters, processing 1 million at a time with a progress bar. 

201 

202 :param updates: A dictionary structure containing the updates. 

203 The structure is as follows: 

204 { 

205 supplier_prefix: { 

206 (short_name, prov_short_name): { 

207 identifier: counter_value 

208 } 

209 } 

210 } 

211 :type updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]] 

212 """ 

213 all_updates = [] 

214 for supplier_prefix, value in updates.items(): 

215 for (short_name, prov_short_name), counters in value.items(): 

216 for identifier, counter_value in counters.items(): 

217 key = self._get_key(short_name, prov_short_name, identifier, supplier_prefix) 

218 all_updates.append((key, counter_value)) 

219 

220 total_updates = len(all_updates) 

221 batch_size = 1_000_000 

222 

223 with tqdm(total=total_updates, desc="Updating counters") as pbar: 

224 for i in range(0, total_updates, batch_size): 

225 batch = all_updates[i:i+batch_size] 

226 pipeline = self.redis.pipeline() 

227 for key, value in batch: 

228 pipeline.set(key, value) 

229 pipeline.execute() 

230 pbar.update(len(batch)) 

231