Coverage for rdflib_ocdm/prov/provenance.py: 92%

122 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-11-01 22:02 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright 2023 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING 

20 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 from rdflib_ocdm.ocdm_graph import OCDMGraphCommons 

23 from typing import List, Dict, Optional 

24 

25from collections import OrderedDict 

26from datetime import datetime, timezone 

27 

28from rdflib import Dataset, URIRef 

29 

30from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

31from rdflib_ocdm.counter_handler.in_memory_counter_handler import \ 

32 InMemoryCounterHandler 

33from rdflib_ocdm.prov.prov_entity import ProvEntity 

34from rdflib_ocdm.prov.snapshot_entity import SnapshotEntity 

35from rdflib_ocdm.query_utils import get_update_query 

36from rdflib_ocdm.support import get_prov_count 

37 

38 

39class OCDMProvenance(Dataset): 

40 def __init__(self, prov_subj_graph: OCDMGraphCommons, counter_handler: CounterHandler = None): 

41 Dataset.__init__(self) 

42 self.prov_g = prov_subj_graph 

43 # The following variable maps a URIRef with the related provenance entity 

44 self.res_to_entity: Dict[URIRef, ProvEntity] = dict() 

45 self.all_entities = set() 

46 if counter_handler is None: 

47 counter_handler = InMemoryCounterHandler() 

48 self.counter_handler = counter_handler 

49 

50 def generate_provenance(self, c_time: float = None) -> None: 

51 if c_time is None: 

52 cur_time: str = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat(sep="T") 

53 else: 

54 cur_time: str = datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0).isoformat(sep="T") 

55 merge_index = self.prov_g.merge_index 

56 prov_g_subjects = OrderedDict(sorted(self.prov_g.entity_index.items(), key=lambda x: not x[1]['to_be_deleted'], reverse=True)) 

57 for cur_subj, cur_subj_metadata in prov_g_subjects.items(): 

58 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(str(cur_subj)) 

59 if cur_subj_metadata['to_be_deleted']: 

60 update_query: str = get_update_query(self.prov_g, cur_subj)[0] 

61 # DELETION SNAPSHOT 

62 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

63 last_snapshot.has_invalidation_time(cur_time) 

64 

65 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

66 cur_snapshot.derives_from(last_snapshot) 

67 cur_snapshot.has_invalidation_time(cur_time) 

68 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been deleted.") 

69 cur_snapshot.has_update_action(update_query) 

70 elif cur_subj_metadata['is_restored']: 

71 # RESTORATION SNAPSHOT 

72 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

73 # Non settiamo l'invalidation time per il precedente snapshot in caso di restore 

74 

75 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

76 cur_snapshot.derives_from(last_snapshot) 

77 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been restored.") 

78 

79 update_query: str = get_update_query(self.prov_g, cur_subj)[0] 

80 if update_query: 80 ↛ 57line 80 didn't jump to line 57 because the condition on line 80 was always true

81 cur_snapshot.has_update_action(update_query) 

82 else: 

83 if last_snapshot_res is None: 

84 # CREATION SNAPSHOT 

85 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

86 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been created.") 

87 else: 

88 update_query = get_update_query(self.prov_g, cur_subj)[0] 

89 cur_subj_merge_index = {k: v for k, v in merge_index.items() if k == cur_subj} 

90 snapshots_list = self._get_snapshots_from_merge_list(cur_subj_merge_index) 

91 if update_query and len(snapshots_list) == 0: 

92 # MODIFICATION SNAPSHOT 

93 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

94 last_snapshot.has_invalidation_time(cur_time) 

95 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

96 cur_snapshot.derives_from(last_snapshot) 

97 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' was modified.") 

98 cur_snapshot.has_update_action(update_query) 

99 elif len(snapshots_list) > 0: 

100 # MERGE SNAPSHOT 

101 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

102 last_snapshot.has_invalidation_time(cur_time) 

103 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

104 cur_snapshot.derives_from(last_snapshot) 

105 for snapshot in snapshots_list: 

106 cur_snapshot.derives_from(snapshot) 

107 if update_query: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 cur_snapshot.has_update_action(update_query) 

109 cur_snapshot.has_description(self._get_merge_description(cur_subj, snapshots_list)) 

110 

111 @staticmethod 

112 def _get_merge_description(cur_subj: URIRef, snapshots_list: List[SnapshotEntity]) -> str: 

113 merge_description: str = f"The entity '{str(cur_subj)}' was merged" 

114 is_first: bool = True 

115 for snapshot in snapshots_list: 

116 if is_first: 116 ↛ 120line 116 didn't jump to line 120 because the condition on line 116 was always true

117 merge_description += f" with '{snapshot.prov_subject}'" 

118 is_first = False 

119 else: 

120 merge_description += f", '{snapshot.prov_subject}'" 

121 merge_description += "." 

122 return merge_description 

123 

124 def _retrieve_last_snapshot(self, prov_subject: URIRef) -> Optional[URIRef]: 

125 last_snapshot_count: str = str(self.counter_handler.read_counter(str(prov_subject))) 

126 if int(last_snapshot_count) <= 0: 

127 return None 

128 else: 

129 return URIRef(str(prov_subject) + '/prov/se/' + last_snapshot_count) 

130 

131 def _create_snapshot(self, cur_subj: URIRef, cur_time: str) -> SnapshotEntity: 

132 new_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj) 

133 new_snapshot.is_snapshot_of(cur_subj) 

134 new_snapshot.has_generation_time(cur_time) 

135 source = self.prov_g.entity_index[cur_subj]['source'] 

136 resp_agent = self.prov_g.entity_index[cur_subj]['resp_agent'] 

137 if source is not None: 

138 new_snapshot.has_primary_source(URIRef(source)) 

139 if resp_agent is not None: 

140 new_snapshot.has_resp_agent(URIRef(resp_agent)) 

141 return new_snapshot 

142 

143 def _get_snapshots_from_merge_list(self, cur_subj_merge_index: dict) -> List[SnapshotEntity]: 

144 snapshots_list: List[SnapshotEntity] = [] 

145 for _, merge_entities in cur_subj_merge_index.items(): 

146 for merge_entity in merge_entities: 

147 last_entity_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(merge_entity) 

148 if last_entity_snapshot_res is not None: 148 ↛ 146line 148 didn't jump to line 146 because the condition on line 148 was always true

149 snapshots_list.append(self.add_se(prov_subject=merge_entity, res=last_entity_snapshot_res)) 

150 return snapshots_list 

151 

152 def add_se(self, prov_subject: URIRef, res: URIRef = None) -> SnapshotEntity: 

153 if res is not None and res in self.res_to_entity: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 return self.res_to_entity[res] 

155 count = self._add_prov(str(prov_subject), res) 

156 se = SnapshotEntity(str(prov_subject), self, count) 

157 return se 

158 

159 def _add_prov(self, prov_subject: str, res: URIRef) -> Optional[str]: 

160 if res is not None: 

161 res_count: int = int(get_prov_count(res)) 

162 if res_count > self.counter_handler.read_counter(prov_subject): 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 self.counter_handler.set_counter(res_count, prov_subject) 

164 return str(res_count) 

165 return str(self.counter_handler.increment_counter(prov_subject)) 

166 

167 def get_entity(self, res: str) -> Optional[ProvEntity]: 

168 if res in self.res_to_entity: 

169 return self.res_to_entity[res]