Coverage for oc_ds_converter / datasource / redis.py: 46%
99 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3#
4# SPDX-License-Identifier: ISC
5from __future__ import annotations
7#
9import configparser
10import json
11import os
12from os.path import join
13from typing import cast
15import fakeredis
16import redis
18from oc_ds_converter.datasource.datasource import DataSource
21class FakeRedisWrapper:
22 def __init__(self) -> None:
23 self._r = fakeredis.FakeStrictRedis()
25 def get(self, resource_id: str) -> bytes | None:
26 return cast(bytes | None, self._r.get(resource_id))
28 def mget(self, resources_id: list[str]) -> list[bytes | None]:
29 if not resources_id:
30 return []
31 return cast(list[bytes | None], self._r.mget(resources_id))
33 def set(self, resource_id: str, value: str | bytes) -> bool | None:
34 return cast(bool | None, self._r.set(resource_id, value))
36 def sadd(self, resource_id: str, *values: str) -> int:
37 return cast(int, self._r.sadd(resource_id, *values))
39 def smembers(self, resource_id: str) -> set[bytes]:
40 return cast(set[bytes], self._r.smembers(resource_id))
42 def delete(self, resource_id: str) -> None:
43 self._r.delete(resource_id)
45 def flushdb(self) -> None:
46 self._r.flushdb()
48 def exists_as_set(self, resource_id: str) -> bool:
49 return cast(int, self._r.scard(resource_id)) > 0
51 def mexists_as_set(self, resources_id: list[str]) -> list[bool]:
52 if not resources_id:
53 return []
54 pipe = self._r.pipeline()
55 for rid in resources_id:
56 pipe.scard(rid)
57 results: list[int] = cast(list[int], pipe.execute())
58 return [count > 0 for count in results]
61class RedisDataSource(DataSource):
62 _SERVICE_TO_DB_SECTION: dict[str, str] = {
63 "DB-META-RA": "database 0",
64 "DB-META-BR": "database 1",
65 "PROCESS-DB": "database 2",
66 "DOI-ORCID-INDEX": "database 3",
67 "PUBLISHERS-INDEX": "database 4",
68 }
70 def __init__(self, service: str, config_filepath: str = 'config.ini') -> None:
71 super().__init__(service)
72 if service not in self._SERVICE_TO_DB_SECTION:
73 raise ValueError(f"Unknown service: {service}")
75 config = configparser.ConfigParser(allow_no_value=True)
76 cur_path = os.path.dirname(os.path.abspath(__file__))
77 conf_file = config_filepath if config_filepath != 'config.ini' else join(cur_path, config_filepath)
78 config.read(conf_file)
80 host = config.get('redis', 'host')
81 port = int(config.get('redis', 'port'))
82 password = config.get('redis', 'password') or None
83 db_section = self._SERVICE_TO_DB_SECTION[service]
84 db = int(config.get(db_section, 'db'))
86 self._r = redis.Redis(
87 host=host,
88 port=port,
89 db=db,
90 password=password,
91 decode_responses=True
92 )
94 def get(self, resource_id: str) -> str | int | object | None:
95 redis_data = self._r.get(resource_id)
96 if redis_data is not None:
97 if isinstance(redis_data, (str, int)):
98 return redis_data
99 return json.loads(str(redis_data))
100 return None
102 def mget(self, resources_id: list[str]) -> list[object | None]:
103 if not resources_id:
104 return []
105 result: list[object | None] = []
106 raw_results = cast(list[str | None], self._r.mget(resources_id))
107 for x in raw_results:
108 if x and isinstance(x, (int, str, bool)):
109 result.append(x)
110 elif x and isinstance(x, bytes):
111 result.append(json.loads(x))
112 else:
113 result.append(None)
114 return result
116 def flushdb(self) -> None:
117 self._r.flushdb()
119 def delete(self, resource_id: str) -> None:
120 self._r.delete(resource_id)
122 def scan_iter(self, match: str = "*") -> object:
123 return self._r.scan_iter(match=match)
125 def set(self, resource_id: str, value: object) -> bool | None:
126 return cast(bool | None, self._r.set(resource_id, json.dumps(value)))
128 def mset(self, resources: dict[str, object]) -> bool | None:
129 if resources:
130 return cast(bool | None, self._r.mset({k: json.dumps(v) for k, v in resources.items()}))
131 return None
133 def sadd(self, resource_id: str, *values: str) -> int:
134 return cast(int, self._r.sadd(resource_id, *values))
136 def smembers(self, resource_id: str) -> set[str]:
137 return cast(set[str], self._r.smembers(resource_id))
139 def exists_as_set(self, resource_id: str) -> bool:
140 return cast(int, self._r.scard(resource_id)) > 0
142 def mexists_as_set(self, resources_id: list[str]) -> list[bool]:
143 if not resources_id:
144 return []
145 pipe = self._r.pipeline()
146 for rid in resources_id:
147 pipe.scard(rid)
148 results: list[int] = cast(list[int], pipe.execute())
149 return [count > 0 for count in results]