Coverage for oc_ds_converter / oc_idmanager / oc_data_storage / batch_manager.py: 100%
21 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager
10class BatchManager(StorageManager):
11 """A simple in-memory dict wrapper for batching validation results before writing to Redis."""
13 def __init__(self, **params: object) -> None:
14 super().__init__(**params)
15 self._data: dict[str, dict[str, bool]] = {}
17 def set_value(self, id: str, value: bool) -> None:
18 id_name = str(id)
19 if id_name in self._data:
20 self._data[id_name]["valid"] = value
21 else:
22 self._data[id_name] = {"valid": value}
24 def get_value(self, id: str) -> bool | None:
25 id_name = str(id)
26 id_in_dict = self._data.get(id_name)
27 if id_in_dict:
28 return id_in_dict["valid"]
29 return None
31 def get_validity_list_of_tuples(self) -> list[tuple[str, bool]]:
32 return [(k, v["valid"]) for k, v in self._data.items()]
34 def delete_storage(self) -> None:
35 self._data = {}