Coverage for oc_ocdm/prov/prov_set.py: 91%

199 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-05-30 22:05 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16from __future__ import annotations 

17 

18import os 

19from datetime import datetime, timezone 

20from typing import TYPE_CHECKING 

21 

22from oc_ocdm.abstract_set import AbstractSet 

23from oc_ocdm.prov.entities.snapshot_entity import SnapshotEntity 

24from oc_ocdm.support.query_utils import get_update_query 

25 

26if TYPE_CHECKING: 

27 from typing import Optional, Tuple, List, Dict, ClassVar 

28 from oc_ocdm.graph.graph_entity import GraphEntity 

29 

30from rdflib import Graph, URIRef 

31 

32from oc_ocdm.counter_handler.counter_handler import CounterHandler 

33from oc_ocdm.counter_handler.filesystem_counter_handler import \ 

34 FilesystemCounterHandler 

35from oc_ocdm.counter_handler.in_memory_counter_handler import \ 

36 InMemoryCounterHandler 

37from oc_ocdm.counter_handler.sqlite_counter_handler import SqliteCounterHandler 

38from oc_ocdm.graph.graph_set import GraphSet 

39from oc_ocdm.prov.prov_entity import ProvEntity 

40from oc_ocdm.support.support import (get_count, get_prefix, get_short_name) 

41 

42 

43class ProvSet(AbstractSet): 

44 # Labels 

45 labels: ClassVar[Dict[str, str]] = { 

46 "se": "snapshot of entity metadata" 

47 } 

48 

49 def __init__(self, prov_subj_graph_set: GraphSet, base_iri: str, info_dir: str = "", 

50 wanted_label: bool = True, custom_counter_handler: CounterHandler = None, 

51 supplier_prefix: str = "") -> None: 

52 super(ProvSet, self).__init__() 

53 self.prov_g: GraphSet = prov_subj_graph_set 

54 # The following variable maps a URIRef with the related provenance entity 

55 self.res_to_entity: Dict[URIRef, ProvEntity] = {} 

56 self.base_iri: str = base_iri 

57 self.wanted_label: bool = wanted_label 

58 self.info_dir = info_dir 

59 self.supplier_prefix = supplier_prefix 

60 if custom_counter_handler: 

61 self.counter_handler = custom_counter_handler 

62 elif info_dir is not None and info_dir != "": 

63 self.counter_handler = FilesystemCounterHandler(info_dir, supplier_prefix=supplier_prefix) 

64 else: 

65 self.counter_handler = InMemoryCounterHandler() 

66 

67 def get_entity(self, res: URIRef) -> Optional[ProvEntity]: 

68 if res in self.res_to_entity: 

69 return self.res_to_entity[res] 

70 

71 def add_se(self, prov_subject: GraphEntity, res: URIRef = None) -> SnapshotEntity: 

72 if res is not None and get_short_name(res) != "se": 

73 raise ValueError(f"Given res: <{res}> is inappropriate for a SnapshotEntity entity.") 

74 if res is not None and res in self.res_to_entity: 

75 return self.res_to_entity[res] 

76 g_prov: str = str(prov_subject) + "/prov/" 

77 supplier_prefix = get_prefix(str(prov_subject.res)) 

78 cur_g, count, label = self._add_prov(g_prov, "se", prov_subject, res, supplier_prefix) 

79 return SnapshotEntity(prov_subject, cur_g, self, res, prov_subject.resp_agent, 

80 prov_subject.source, ProvEntity.iri_entity, count, label, "se") 

81 

82 def _create_snapshot(self, cur_subj: GraphEntity, cur_time: str) -> SnapshotEntity: 

83 new_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj) 

84 new_snapshot.is_snapshot_of(cur_subj) 

85 new_snapshot.has_generation_time(cur_time) 

86 if cur_subj.source is not None: 

87 new_snapshot.has_primary_source(URIRef(cur_subj.source)) 

88 if cur_subj.resp_agent is not None: 

89 new_snapshot.has_resp_agent(URIRef(cur_subj.resp_agent)) 

90 return new_snapshot 

91 

92 def _get_snapshots_from_merge_list(self, cur_subj: GraphEntity) -> List[SnapshotEntity]: 

93 snapshots_list: List[SnapshotEntity] = [] 

94 for entity in cur_subj.merge_list: 

95 last_entity_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(entity.res) 

96 if last_entity_snapshot_res is not None: 

97 snapshots_list.append(self.add_se(prov_subject=entity, res=last_entity_snapshot_res)) 

98 return snapshots_list 

99 

100 @staticmethod 

101 def _get_merge_description(cur_subj: GraphEntity, snapshots_list: List[SnapshotEntity]) -> str: 

102 merge_description: str = f"The entity '{cur_subj.res}' has been merged" 

103 is_first: bool = True 

104 for snapshot in snapshots_list: 

105 if is_first: 

106 merge_description += f" with '{snapshot.prov_subject.res}'" 

107 is_first = False 

108 else: 

109 merge_description += f", '{snapshot.prov_subject.res}'" 

110 merge_description += "." 

111 return merge_description 

112 

113 def generate_provenance(self, c_time: float = None) -> set: 

114 modified_entities = set() 

115 

116 if c_time is None: 

117 cur_time: str = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat(sep="T") 

118 else: 

119 cur_time: str = datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0).isoformat(sep="T") 

120 

121 # MERGED ENTITIES 

122 for cur_subj in self.prov_g.res_to_entity.values(): 

123 if cur_subj is None or (not cur_subj.was_merged or cur_subj.to_be_deleted): 

124 # Here we must skip every entity that was not merged or that must be deleted. 

125 continue 

126 

127 # Previous snapshot 

128 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(cur_subj.res) 

129 if last_snapshot_res is None: 

130 # CREATION SNAPSHOT 

131 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

132 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been created.") 

133 modified_entities.add(cur_subj.res) 

134 else: 

135 update_query: str = get_update_query(cur_subj, entity_type="graph")[0] 

136 was_modified: bool = (update_query != "") 

137 snapshots_list: List[SnapshotEntity] = self._get_snapshots_from_merge_list(cur_subj) 

138 if was_modified and len(snapshots_list) <= 0: 

139 # MODIFICATION SNAPSHOT 

140 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

141 last_snapshot.has_invalidation_time(cur_time) 

142 

143 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

144 cur_snapshot.derives_from(last_snapshot) 

145 cur_snapshot.has_update_action(update_query) 

146 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been modified.") 

147 modified_entities.add(cur_subj.res) 

148 elif len(snapshots_list) > 0: 

149 # MERGE SNAPSHOT 

150 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

151 last_snapshot.has_invalidation_time(cur_time) 

152 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

153 cur_snapshot.derives_from(last_snapshot) 

154 for snapshot in snapshots_list: 

155 cur_snapshot.derives_from(snapshot) 

156 if update_query: 

157 cur_snapshot.has_update_action(update_query) 

158 cur_snapshot.has_description(self._get_merge_description(cur_subj, snapshots_list)) 

159 modified_entities.add(cur_subj.res) 

160 

161 # EVERY OTHER ENTITY 

162 for cur_subj in self.prov_g.res_to_entity.values(): 

163 if cur_subj is None or (cur_subj.was_merged and not cur_subj.to_be_deleted): 

164 # Here we must skip every entity which was merged while not being marked as to be deleted, 

165 # since we already processed those entities in the previous loop. 

166 continue 

167 

168 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(cur_subj.res) 

169 if last_snapshot_res is None: 

170 if cur_subj.to_be_deleted: 

171 # We can ignore this entity because it was deleted even before being created. 

172 pass 

173 else: 

174 # CREATION SNAPSHOT 

175 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

176 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been created.") 

177 modified_entities.add(cur_subj.res) 

178 else: 

179 update_query: str = get_update_query(cur_subj, entity_type="graph")[0] 

180 was_modified: bool = (update_query != "") 

181 if cur_subj.to_be_deleted: 

182 # DELETION SNAPSHOT 

183 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

184 last_snapshot.has_invalidation_time(cur_time) 

185 

186 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

187 cur_snapshot.derives_from(last_snapshot) 

188 cur_snapshot.has_invalidation_time(cur_time) 

189 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been deleted.") 

190 cur_snapshot.has_update_action(update_query) 

191 modified_entities.add(cur_subj.res) 

192 elif cur_subj.is_restored: 

193 # RESTORATION SNAPSHOT 

194 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

195 # Don't set invalidation time on previous snapshot for restorations 

196 

197 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

198 cur_snapshot.derives_from(last_snapshot) 

199 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been restored.") 

200 if update_query: 

201 cur_snapshot.has_update_action(update_query) 

202 modified_entities.add(cur_subj.res) 

203 elif was_modified: 

204 # MODIFICATION SNAPSHOT 

205 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

206 last_snapshot.has_invalidation_time(cur_time) 

207 

208 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

209 cur_snapshot.derives_from(last_snapshot) 

210 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been modified.") 

211 cur_snapshot.has_update_action(update_query) 

212 modified_entities.add(cur_subj.res) 

213 return modified_entities 

214 

215 def _add_prov(self, graph_url: str, short_name: str, prov_subject: GraphEntity, 

216 res: URIRef = None, supplier_prefix: str = "") -> Tuple[Graph, Optional[str], Optional[str]]: 

217 cur_g: Graph = Graph(identifier=graph_url) 

218 self._set_ns(cur_g) 

219 

220 count: Optional[str] = None 

221 label: Optional[str] = None 

222 

223 if res is not None: 

224 try: 

225 res_count: int = int(get_count(res)) 

226 except ValueError: 

227 res_count: int = -1 

228 

229 if isinstance(self.counter_handler, SqliteCounterHandler): 

230 cur_count: int = self.counter_handler.read_counter(prov_subject) 

231 else: 

232 cur_count: int = self.counter_handler.read_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix) 

233 

234 if res_count > cur_count: 

235 if isinstance(self.counter_handler, SqliteCounterHandler): 

236 self.counter_handler.set_counter(int(get_count(prov_subject.res)), prov_subject) 

237 else: 

238 self.counter_handler.set_counter(res_count, prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix) 

239 return cur_g, count, label 

240 

241 if isinstance(self.counter_handler, SqliteCounterHandler): 

242 count = str(self.counter_handler.increment_counter(prov_subject)) 

243 else: 

244 count = str(self.counter_handler.increment_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix)) 

245 

246 if self.wanted_label: 

247 cur_short_name = prov_subject.short_name 

248 cur_entity_count = get_count(prov_subject.res) 

249 cur_entity_prefix = get_prefix(prov_subject.res) 

250 

251 related_to_label = "related to %s %s%s" % (GraphSet.labels[cur_short_name], cur_entity_prefix, 

252 cur_entity_count) 

253 related_to_short_label = "-> %s/%s%s" % (cur_short_name, cur_entity_prefix, cur_entity_count) 

254 

255 label = "%s %s %s [%s/%s %s]" % (self.labels[short_name], count, related_to_label, short_name, count, 

256 related_to_short_label) 

257 

258 return cur_g, count, label 

259 

260 @staticmethod 

261 def _set_ns(g: Graph) -> None: 

262 g.namespace_manager.bind("prov", ProvEntity.PROV) 

263 

264 def _retrieve_last_snapshot(self, prov_subject: URIRef) -> Optional[URIRef]: 

265 subj_short_name: str = get_short_name(prov_subject) 

266 try: 

267 subj_count: str = get_count(prov_subject) 

268 if int(subj_count) <= 0: 

269 raise ValueError('prov_subject is not a valid URIRef. Extracted count value should be a positive ' 

270 'non-zero integer number!') 

271 except ValueError: 

272 raise ValueError('prov_subject is not a valid URIRef. Unable to extract the count value!') 

273 

274 supplier_prefix = get_prefix(str(prov_subject)) 

275 

276 if isinstance(self.counter_handler, SqliteCounterHandler): 

277 last_snapshot_count: str = str(self.counter_handler.read_counter(prov_subject)) 

278 else: 

279 last_snapshot_count: str = str(self.counter_handler.read_counter(subj_short_name, "se", int(subj_count), supplier_prefix=supplier_prefix)) 

280 

281 if int(last_snapshot_count) <= 0: 

282 return None 

283 else: 

284 return URIRef(str(prov_subject) + '/prov/se/' + last_snapshot_count) 

285 

286 def get_se(self) -> Tuple[SnapshotEntity]: 

287 result: Tuple[SnapshotEntity] = tuple() 

288 for ref in self.res_to_entity: 

289 entity: ProvEntity = self.res_to_entity[ref] 

290 if isinstance(entity, SnapshotEntity): 

291 result += (entity, ) 

292 return result