Coverage for rdflib_ocdm / prov / provenance.py: 94%

119 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-21 12:35 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2023-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7from __future__ import annotations 

8 

9from typing import TYPE_CHECKING 

10 

11if TYPE_CHECKING: 

12 from rdflib_ocdm.ocdm_graph import OCDMGraphCommons 

13 from typing import List, Dict, Optional 

14 

15from collections import OrderedDict 

16from datetime import datetime, timezone 

17 

18from rdflib import Dataset, URIRef 

19 

20from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

21from rdflib_ocdm.counter_handler.in_memory_counter_handler import \ 

22 InMemoryCounterHandler 

23from rdflib_ocdm.prov.prov_entity import ProvEntity 

24from rdflib_ocdm.prov.snapshot_entity import SnapshotEntity 

25from rdflib_ocdm.query_utils import get_update_query 

26from rdflib_ocdm.support import get_prov_count 

27 

28 

29class OCDMProvenance(Dataset): 

30 def __init__(self, prov_subj_graph: OCDMGraphCommons, counter_handler: CounterHandler = None): 

31 Dataset.__init__(self) 

32 self.prov_g = prov_subj_graph 

33 # The following variable maps a URIRef with the related provenance entity 

34 self.res_to_entity: Dict[URIRef, ProvEntity] = dict() 

35 self.all_entities = set() 

36 if counter_handler is None: 

37 counter_handler = InMemoryCounterHandler() 

38 self.counter_handler = counter_handler 

39 

40 def generate_provenance(self, c_time: float = None) -> None: 

41 if c_time is None: 

42 cur_time: str = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat(sep="T") 

43 else: 

44 cur_time: str = datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0).isoformat(sep="T") 

45 merge_index = self.prov_g.merge_index 

46 prov_g_subjects = OrderedDict(sorted(self.prov_g.entity_index.items(), key=lambda x: not x[1]['to_be_deleted'], reverse=True)) 

47 for cur_subj, cur_subj_metadata in prov_g_subjects.items(): 

48 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(str(cur_subj)) 

49 if cur_subj_metadata['to_be_deleted']: 

50 update_query: str = get_update_query(self.prov_g, cur_subj)[0] 

51 # DELETION SNAPSHOT 

52 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

53 last_snapshot.has_invalidation_time(cur_time) 

54 

55 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

56 cur_snapshot.derives_from(last_snapshot) 

57 cur_snapshot.has_invalidation_time(cur_time) 

58 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been deleted.") 

59 cur_snapshot.has_update_action(update_query) 

60 elif cur_subj_metadata['is_restored']: 

61 # RESTORATION SNAPSHOT 

62 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

63 # Non settiamo l'invalidation time per il precedente snapshot in caso di restore 

64 

65 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

66 cur_snapshot.derives_from(last_snapshot) 

67 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been restored.") 

68 

69 update_query: str = get_update_query(self.prov_g, cur_subj)[0] 

70 if update_query: 70 ↛ 47line 70 didn't jump to line 47 because the condition on line 70 was always true

71 cur_snapshot.has_update_action(update_query) 

72 else: 

73 if last_snapshot_res is None: 

74 # CREATION SNAPSHOT 

75 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

76 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been created.") 

77 else: 

78 update_query = get_update_query(self.prov_g, cur_subj)[0] 

79 cur_subj_merge_index = {k: v for k, v in merge_index.items() if k == cur_subj} 

80 snapshots_list = self._get_snapshots_from_merge_list(cur_subj_merge_index) 

81 if update_query and len(snapshots_list) == 0: 

82 # MODIFICATION SNAPSHOT 

83 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

84 last_snapshot.has_invalidation_time(cur_time) 

85 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

86 cur_snapshot.derives_from(last_snapshot) 

87 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' was modified.") 

88 cur_snapshot.has_update_action(update_query) 

89 elif len(snapshots_list) > 0: 

90 # MERGE SNAPSHOT 

91 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

92 last_snapshot.has_invalidation_time(cur_time) 

93 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

94 cur_snapshot.derives_from(last_snapshot) 

95 for snapshot in snapshots_list: 

96 cur_snapshot.derives_from(snapshot) 

97 if update_query: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 cur_snapshot.has_update_action(update_query) 

99 cur_snapshot.has_description(self._get_merge_description(cur_subj, snapshots_list)) 

100 

101 @staticmethod 

102 def _get_merge_description(cur_subj: URIRef, snapshots_list: List[SnapshotEntity]) -> str: 

103 merge_description: str = f"The entity '{str(cur_subj)}' was merged" 

104 is_first: bool = True 

105 for snapshot in snapshots_list: 

106 if is_first: 106 ↛ 110line 106 didn't jump to line 110 because the condition on line 106 was always true

107 merge_description += f" with '{snapshot.prov_subject}'" 

108 is_first = False 

109 else: 

110 merge_description += f", '{snapshot.prov_subject}'" 

111 merge_description += "." 

112 return merge_description 

113 

114 def _retrieve_last_snapshot(self, prov_subject: URIRef) -> Optional[URIRef]: 

115 last_snapshot_count: str = str(self.counter_handler.read_counter(str(prov_subject))) 

116 if int(last_snapshot_count) <= 0: 

117 return None 

118 else: 

119 return URIRef(str(prov_subject) + '/prov/se/' + last_snapshot_count) 

120 

121 def _create_snapshot(self, cur_subj: URIRef, cur_time: str) -> SnapshotEntity: 

122 new_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj) 

123 new_snapshot.is_snapshot_of(cur_subj) 

124 new_snapshot.has_generation_time(cur_time) 

125 source = self.prov_g.entity_index[cur_subj]['source'] 

126 resp_agent = self.prov_g.entity_index[cur_subj]['resp_agent'] 

127 if source is not None: 

128 new_snapshot.has_primary_source(URIRef(source)) 

129 if resp_agent is not None: 

130 new_snapshot.has_resp_agent(URIRef(resp_agent)) 

131 return new_snapshot 

132 

133 def _get_snapshots_from_merge_list(self, cur_subj_merge_index: dict) -> List[SnapshotEntity]: 

134 snapshots_list: List[SnapshotEntity] = [] 

135 for _, merge_entities in cur_subj_merge_index.items(): 

136 for merge_entity in merge_entities: 

137 last_entity_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(merge_entity) 

138 if last_entity_snapshot_res is not None: 138 ↛ 136line 138 didn't jump to line 136 because the condition on line 138 was always true

139 snapshots_list.append(self.add_se(prov_subject=merge_entity, res=last_entity_snapshot_res)) 

140 return snapshots_list 

141 

142 def add_se(self, prov_subject: URIRef, res: URIRef = None) -> SnapshotEntity: 

143 if res is not None and res in self.res_to_entity: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 return self.res_to_entity[res] 

145 count = self._add_prov(str(prov_subject), res) 

146 se = SnapshotEntity(str(prov_subject), self, count) 

147 return se 

148 

149 def _add_prov(self, prov_subject: str, res: URIRef) -> Optional[str]: 

150 if res is not None: 

151 res_count: int = int(get_prov_count(res)) 

152 if res_count > self.counter_handler.read_counter(prov_subject): 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true

153 self.counter_handler.set_counter(res_count, prov_subject) 

154 return str(res_count) 

155 return str(self.counter_handler.increment_counter(prov_subject)) 

156 

157 def get_entity(self, res: str) -> Optional[ProvEntity]: 

158 if res in self.res_to_entity: 

159 return self.res_to_entity[res]