Coverage for oc_ds_converter / datasource / redis.py: 46%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# 

4# SPDX-License-Identifier: ISC 

5from __future__ import annotations 

6 

7# 

8 

9import configparser 

10import json 

11import os 

12from os.path import join 

13from typing import cast 

14 

15import fakeredis 

16import redis 

17 

18from oc_ds_converter.datasource.datasource import DataSource 

19 

20 

21class FakeRedisWrapper: 

22 def __init__(self) -> None: 

23 self._r = fakeredis.FakeStrictRedis() 

24 

25 def get(self, resource_id: str) -> bytes | None: 

26 return cast(bytes | None, self._r.get(resource_id)) 

27 

28 def mget(self, resources_id: list[str]) -> list[bytes | None]: 

29 if not resources_id: 

30 return [] 

31 return cast(list[bytes | None], self._r.mget(resources_id)) 

32 

33 def set(self, resource_id: str, value: str | bytes) -> bool | None: 

34 return cast(bool | None, self._r.set(resource_id, value)) 

35 

36 def sadd(self, resource_id: str, *values: str) -> int: 

37 return cast(int, self._r.sadd(resource_id, *values)) 

38 

39 def smembers(self, resource_id: str) -> set[bytes]: 

40 return cast(set[bytes], self._r.smembers(resource_id)) 

41 

42 def delete(self, resource_id: str) -> None: 

43 self._r.delete(resource_id) 

44 

45 def flushdb(self) -> None: 

46 self._r.flushdb() 

47 

48 def exists_as_set(self, resource_id: str) -> bool: 

49 return cast(int, self._r.scard(resource_id)) > 0 

50 

51 def mexists_as_set(self, resources_id: list[str]) -> list[bool]: 

52 if not resources_id: 

53 return [] 

54 pipe = self._r.pipeline() 

55 for rid in resources_id: 

56 pipe.scard(rid) 

57 results: list[int] = cast(list[int], pipe.execute()) 

58 return [count > 0 for count in results] 

59 

60 

61class RedisDataSource(DataSource): 

62 _SERVICE_TO_DB_SECTION: dict[str, str] = { 

63 "DB-META-RA": "database 0", 

64 "DB-META-BR": "database 1", 

65 "PROCESS-DB": "database 2", 

66 "DOI-ORCID-INDEX": "database 3", 

67 "PUBLISHERS-INDEX": "database 4", 

68 } 

69 

70 def __init__(self, service: str, config_filepath: str = 'config.ini') -> None: 

71 super().__init__(service) 

72 if service not in self._SERVICE_TO_DB_SECTION: 

73 raise ValueError(f"Unknown service: {service}") 

74 

75 config = configparser.ConfigParser(allow_no_value=True) 

76 cur_path = os.path.dirname(os.path.abspath(__file__)) 

77 conf_file = config_filepath if config_filepath != 'config.ini' else join(cur_path, config_filepath) 

78 config.read(conf_file) 

79 

80 host = config.get('redis', 'host') 

81 port = int(config.get('redis', 'port')) 

82 password = config.get('redis', 'password') or None 

83 db_section = self._SERVICE_TO_DB_SECTION[service] 

84 db = int(config.get(db_section, 'db')) 

85 

86 self._r = redis.Redis( 

87 host=host, 

88 port=port, 

89 db=db, 

90 password=password, 

91 decode_responses=True 

92 ) 

93 

94 def get(self, resource_id: str) -> str | int | object | None: 

95 redis_data = self._r.get(resource_id) 

96 if redis_data is not None: 

97 if isinstance(redis_data, (str, int)): 

98 return redis_data 

99 return json.loads(str(redis_data)) 

100 return None 

101 

102 def mget(self, resources_id: list[str]) -> list[object | None]: 

103 if not resources_id: 

104 return [] 

105 result: list[object | None] = [] 

106 raw_results = cast(list[str | None], self._r.mget(resources_id)) 

107 for x in raw_results: 

108 if x and isinstance(x, (int, str, bool)): 

109 result.append(x) 

110 elif x and isinstance(x, bytes): 

111 result.append(json.loads(x)) 

112 else: 

113 result.append(None) 

114 return result 

115 

116 def flushdb(self) -> None: 

117 self._r.flushdb() 

118 

119 def delete(self, resource_id: str) -> None: 

120 self._r.delete(resource_id) 

121 

122 def scan_iter(self, match: str = "*") -> object: 

123 return self._r.scan_iter(match=match) 

124 

125 def set(self, resource_id: str, value: object) -> bool | None: 

126 return cast(bool | None, self._r.set(resource_id, json.dumps(value))) 

127 

128 def mset(self, resources: dict[str, object]) -> bool | None: 

129 if resources: 

130 return cast(bool | None, self._r.mset({k: json.dumps(v) for k, v in resources.items()})) 

131 return None 

132 

133 def sadd(self, resource_id: str, *values: str) -> int: 

134 return cast(int, self._r.sadd(resource_id, *values)) 

135 

136 def smembers(self, resource_id: str) -> set[str]: 

137 return cast(set[str], self._r.smembers(resource_id)) 

138 

139 def exists_as_set(self, resource_id: str) -> bool: 

140 return cast(int, self._r.scard(resource_id)) > 0 

141 

142 def mexists_as_set(self, resources_id: list[str]) -> list[bool]: 

143 if not resources_id: 

144 return [] 

145 pipe = self._r.pipeline() 

146 for rid in resources_id: 

147 pipe.scard(rid) 

148 results: list[int] = cast(list[int], pipe.execute()) 

149 return [count > 0 for count in results]