Coverage for rdflib_ocdm/counter_handler/redis_counter_handler.py: 100%
32 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-11-01 22:02 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-11-01 22:02 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2023, Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17import redis
20class RedisCounterHandler:
21 def __init__(self, host, port, db: int, password=None):
22 self.host = host
23 self.port = port
24 self.db = db
25 self.password = password
26 self.connection = None
28 def connect(self):
29 self.connection = redis.Redis(host=self.host, port=self.port, db=self.db, password=self.password)
31 def disconnect(self):
32 if self.connection:
33 self.connection.close()
35 def set_counter(self, new_value: int, entity_name: str) -> None:
36 entity_name = str(entity_name)
37 if new_value < 0:
38 raise ValueError("new_value must be a non negative integer!")
39 self.connection.set(entity_name, new_value)
41 def read_counter(self, entity_name: str) -> int:
42 entity_name = str(entity_name)
43 result = self.connection.get(entity_name)
44 if result:
45 return int(result.decode('utf-8'))
46 else:
47 return 0
49 def increment_counter(self, entity_name: str) -> int:
50 entity_name = str(entity_name)
51 cur_count = self.read_counter(entity_name)
52 count = cur_count + 1
53 self.set_counter(count, entity_name)
54 return count
56 def flush(self):
57 self.connection.flushdb()