Coverage for oc_ocdm/counter_handler/redis_counter_handler.py: 73%

60 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-05-30 22:05 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2024, Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from typing import Dict, Optional, Tuple, Union 

18 

19import redis 

20from tqdm import tqdm 

21 

22from oc_ocdm.counter_handler.counter_handler import CounterHandler 

23 

24 

25class RedisCounterHandler(CounterHandler): 

26 """A concrete implementation of the ``CounterHandler`` interface that persistently stores 

27 the counter values within a Redis database.""" 

28 

29 def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0, password: Optional[str] = None) -> None: 

30 """ 

31 Constructor of the ``RedisCounterHandler`` class. 

32 

33 :param host: Redis server host 

34 :type host: str 

35 :param port: Redis server port 

36 :type port: int 

37 :param db: Redis database number 

38 :type db: int 

39 :param password: Redis password (if required) 

40 :type password: Optional[str] 

41 """ 

42 self.redis = redis.Redis(host=host, port=port, db=db, password=password, decode_responses=True) 

43 

44 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "", 

45 identifier: int = 1, supplier_prefix: str = "") -> None: 

46 """ 

47 It allows to set the counter value of graph and provenance entities. 

48 

49 :param new_value: The new counter value to be set 

50 :type new_value: int 

51 :param entity_short_name: The short name associated either to the type of the entity itself 

52 or, in case of a provenance entity, to the type of the relative graph entity. 

53 :type entity_short_name: str 

54 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

55 of the entity itself. An empty string otherwise. 

56 :type prov_short_name: str 

57 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

58 graph entity. The integer value '1' otherwise. 

59 :type identifier: int 

60 :param supplier_prefix: The supplier prefix 

61 :type supplier_prefix: str 

62 :raises ValueError: if ``new_value`` is a negative integer 

63 :return: None 

64 """ 

65 if new_value < 0: 

66 raise ValueError("new_value must be a non negative integer!") 

67 

68 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix) 

69 self.redis.set(key, new_value) 

70 

71 def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int: 

72 """ 

73 It allows to read the counter value of graph and provenance entities. 

74 

75 :param entity_short_name: The short name associated either to the type of the entity itself 

76 or, in case of a provenance entity, to the type of the relative graph entity. 

77 :type entity_short_name: str 

78 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

79 of the entity itself. An empty string otherwise. 

80 :type prov_short_name: str 

81 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

82 graph entity. The integer value '1' otherwise. 

83 :type identifier: int 

84 :param supplier_prefix: The supplier prefix 

85 :type supplier_prefix: str 

86 :return: The requested counter value. 

87 """ 

88 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix) 

89 value = self.redis.get(key) 

90 return int(value) if value is not None else 0 

91 

92 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int: 

93 """ 

94 It allows to increment the counter value of graph and provenance entities by one unit. 

95 

96 :param entity_short_name: The short name associated either to the type of the entity itself 

97 or, in case of a provenance entity, to the type of the relative graph entity. 

98 :type entity_short_name: str 

99 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

100 of the entity itself. An empty string otherwise. 

101 :type prov_short_name: str 

102 :param identifier: In case of a provenance entity, the counter value that identifies the relative 

103 graph entity. The integer value '1' otherwise. 

104 :type identifier: int 

105 :param supplier_prefix: The supplier prefix 

106 :type supplier_prefix: str 

107 :return: The newly-updated (already incremented) counter value. 

108 """ 

109 key = self._get_key(entity_short_name, prov_short_name, identifier, supplier_prefix) 

110 return self.redis.incr(key) 

111 

112 def set_metadata_counter(self, new_value: int, entity_short_name: str, dataset_name: str) -> None: 

113 """ 

114 It allows to set the counter value of metadata entities. 

115 

116 :param new_value: The new counter value to be set 

117 :type new_value: int 

118 :param entity_short_name: The short name associated either to the type of the entity itself. 

119 :type entity_short_name: str 

120 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

121 :type dataset_name: str 

122 :raises ValueError: if ``new_value`` is a negative integer or ``dataset_name`` is None 

123 :return: None 

124 """ 

125 if new_value < 0: 

126 raise ValueError("new_value must be a non negative integer!") 

127 

128 if dataset_name is None: 

129 raise ValueError("dataset_name must be provided!") 

130 

131 key = f"metadata:{dataset_name}:{entity_short_name}" 

132 self.redis.set(key, new_value) 

133 

134 def read_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

135 """ 

136 It allows to read the counter value of metadata entities. 

137 

138 :param entity_short_name: The short name associated either to the type of the entity itself. 

139 :type entity_short_name: str 

140 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

141 :type dataset_name: str 

142 :raises ValueError: if ``dataset_name`` is None 

143 :return: The requested counter value. 

144 """ 

145 if dataset_name is None: 

146 raise ValueError("dataset_name must be provided!") 

147 

148 key = f"metadata:{dataset_name}:{entity_short_name}" 

149 value = self.redis.get(key) 

150 return int(value) if value is not None else 0 

151 

152 def increment_metadata_counter(self, entity_short_name: str, dataset_name: str) -> int: 

153 """ 

154 It allows to increment the counter value of metadata entities by one unit. 

155 

156 :param entity_short_name: The short name associated either to the type of the entity itself. 

157 :type entity_short_name: str 

158 :param dataset_name: In case of a ``Dataset``, its name. Otherwise, the name of the relative dataset. 

159 :type dataset_name: str 

160 :raises ValueError: if ``dataset_name`` is None 

161 :return: The newly-updated (already incremented) counter value. 

162 """ 

163 if dataset_name is None: 

164 raise ValueError("dataset_name must be provided!") 

165 

166 key = f"metadata:{dataset_name}:{entity_short_name}" 

167 return self.redis.incr(key) 

168 

169 def _get_key(self, entity_short_name: str, prov_short_name: str = "", identifier: Union[str, int, None] = None, supplier_prefix: str = "") -> str: 

170 """ 

171 Generate a Redis key for the given parameters. 

172 

173 :param entity_short_name: The short name associated either to the type of the entity itself 

174 or, in case of a provenance entity, to the type of the relative graph entity. 

175 :type entity_short_name: str 

176 :param prov_short_name: In case of a provenance entity, the short name associated to the type 

177 of the entity itself. An empty string otherwise. 

178 :type prov_short_name: str 

179 :param identifier: In case of a provenance entity, the identifier of the relative graph entity. 

180 :type identifier: Union[str, int, None] 

181 :param supplier_prefix: The supplier prefix 

182 :type supplier_prefix: str 

183 :return: The generated Redis key 

184 :rtype: str 

185 """ 

186 key_parts = [entity_short_name, supplier_prefix] 

187 if prov_short_name: 

188 key_parts.append(str(identifier)) 

189 key_parts.append(prov_short_name) 

190 return ':'.join(filter(None, key_parts)) 

191 

192 def batch_update_counters(self, updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]) -> None: 

193 """ 

194 Perform batch updates of counters, processing 1 million at a time with a progress bar. 

195 

196 :param updates: A dictionary structure containing the updates. 

197 The structure is as follows: 

198 { 

199 supplier_prefix: { 

200 (short_name, prov_short_name): { 

201 identifier: counter_value 

202 } 

203 } 

204 } 

205 :type updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]] 

206 """ 

207 all_updates = [] 

208 for supplier_prefix, value in updates.items(): 

209 for (short_name, prov_short_name), counters in value.items(): 

210 for identifier, counter_value in counters.items(): 

211 key = self._get_key(short_name, prov_short_name, identifier, supplier_prefix) 

212 all_updates.append((key, counter_value)) 

213 

214 total_updates = len(all_updates) 

215 batch_size = 1_000_000 

216 

217 with tqdm(total=total_updates, desc="Updating counters") as pbar: 

218 for i in range(0, total_updates, batch_size): 

219 batch = all_updates[i:i+batch_size] 

220 pipeline = self.redis.pipeline() 

221 for key, value in batch: 

222 pipeline.set(key, value) 

223 pipeline.execute() 

224 pbar.update(len(batch))