Coverage for oc_ocdm / counter_handler / sqlite_counter_handler.py: 88%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-28 18:52 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7# -*- coding: utf-8 -*- 

8 

9import sqlite3 

10 

11from oc_ocdm.counter_handler.counter_handler import CounterHandler 

12 

13 

14class SqliteCounterHandler(CounterHandler): 

15 """A concrete implementation of the ``CounterHandler`` interface that persistently stores 

16 the counter values within a SQLite database.""" 

17 

18 def __init__(self, database: str) -> None: 

19 """ 

20 Constructor of the ``SqliteCounterHandler`` class. 

21 

22 :param database: The name of the database 

23 :type info_dir: str 

24 """ 

25 sqlite3.threadsafety = 3 

26 self.database = database 

27 self.con = sqlite3.connect(database) 

28 self.cur = self.con.cursor() 

29 self.cur.execute("""CREATE TABLE IF NOT EXISTS info( 

30 entity TEXT PRIMARY KEY, 

31 count INTEGER)""") 

32 

33 def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "", # type: ignore[override] 

34 identifier: int = 1, supplier_prefix: str = "") -> None: 

35 """ 

36 It allows to set the counter value of provenance entities. 

37 

38 In this implementation, ``entity_short_name`` is used as a generic entity key 

39 (which may be a URI string when called from ProvSet with a GraphEntity). 

40 

41 :param new_value: The new counter value to be set 

42 :type new_value: int 

43 :param entity_short_name: The entity name (used as lookup key) 

44 :type entity_short_name: str 

45 :raises ValueError: if ``new_value`` is a negative integer. 

46 :return: None 

47 """ 

48 if new_value < 0: 

49 raise ValueError("new_value must be a non negative integer!") 

50 self.cur.execute(f"INSERT OR REPLACE INTO info (entity, count) VALUES ('{entity_short_name}', {new_value})") 

51 self.con.commit() 

52 

53 def read_counter(self, entity_short_name: str, prov_short_name: str = "", # type: ignore[override] 

54 identifier: int = 1, supplier_prefix: str = "") -> int: 

55 """ 

56 It allows to read the counter value of provenance entities. 

57 

58 :param entity_short_name: The entity name (used as lookup key) 

59 :type entity_short_name: str 

60 :return: The requested counter value. 

61 """ 

62 rows = self.cur.execute(f"SELECT count FROM info WHERE entity='{entity_short_name}'").fetchall() 

63 if len(rows) == 1: 

64 return rows[0][0] 

65 elif len(rows) == 0: 

66 return 0 

67 else: 

68 raise(Exception("There is more than one counter for this entity. The databse id broken")) 

69 

70 def increment_counter(self, entity_short_name: str, prov_short_name: str = "", # type: ignore[override] 

71 identifier: int = 1, supplier_prefix: str = "") -> int: 

72 """ 

73 It allows to increment the counter value of graph and provenance entities by one unit. 

74 

75 :param entity_short_name: The entity name (used as lookup key) 

76 :type entity_short_name: str 

77 :return: The newly-updated (already incremented) counter value. 

78 """ 

79 count = self.read_counter(entity_short_name) + 1 

80 self.set_counter(count, entity_short_name) 

81 return count 

82 

83 def increment_metadata_counter(self, entity_short_name: str = "", dataset_name: str = "") -> int: # type: ignore[override] 

84 return 0 

85 

86 def read_metadata_counter(self, entity_short_name: str = "", dataset_name: str = "") -> int: # type: ignore[override] 

87 return 0 

88 

89 def set_metadata_counter(self, new_value: int = 0, entity_short_name: str = "", dataset_name: str = "") -> None: # type: ignore[override] 

90 pass 

91 

92 def __getstate__(self): 

93 """ 

94 Support for pickle serialization. 

95 

96 Exclude the SQLite connection and cursor objects, which are not picklable. 

97 The database path is preserved and the connection will be recreated upon unpickling. 

98 """ 

99 state = self.__dict__.copy() 

100 del state['con'] 

101 del state['cur'] 

102 return state 

103 

104 def __setstate__(self, state: dict[str, object]) -> None: 

105 """ 

106 Support for pickle deserialization. 

107 

108 Recreates the SQLite connection and cursor after unpickling. 

109 """ 

110 vars(self).update(state) 

111 sqlite3.threadsafety = 3 

112 self.con = sqlite3.connect(self.database) 

113 self.cur = self.con.cursor()