Coverage for oc_ds_converter / oc_idmanager / oc_data_storage / batch_manager.py: 100%

21 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7from oc_ds_converter.oc_idmanager.oc_data_storage.storage_manager import StorageManager 

8 

9 

10class BatchManager(StorageManager): 

11 """A simple in-memory dict wrapper for batching validation results before writing to Redis.""" 

12 

13 def __init__(self, **params: object) -> None: 

14 super().__init__(**params) 

15 self._data: dict[str, dict[str, bool]] = {} 

16 

17 def set_value(self, id: str, value: bool) -> None: 

18 id_name = str(id) 

19 if id_name in self._data: 

20 self._data[id_name]["valid"] = value 

21 else: 

22 self._data[id_name] = {"valid": value} 

23 

24 def get_value(self, id: str) -> bool | None: 

25 id_name = str(id) 

26 id_in_dict = self._data.get(id_name) 

27 if id_in_dict: 

28 return id_in_dict["valid"] 

29 return None 

30 

31 def get_validity_list_of_tuples(self) -> list[tuple[str, bool]]: 

32 return [(k, v["valid"]) for k, v in self._data.items()] 

33 

34 def delete_storage(self) -> None: 

35 self._data = {}