Coverage for oc_ocdm / prov / prov_set.py: 95%

195 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-28 18:52 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7# -*- coding: utf-8 -*- 

8from __future__ import annotations 

9 

10import os 

11from datetime import datetime, timezone 

12from typing import TYPE_CHECKING 

13 

14from oc_ocdm.abstract_set import AbstractSet 

15from oc_ocdm.prov.entities.snapshot_entity import SnapshotEntity 

16from oc_ocdm.support.query_utils import get_update_query 

17 

18if TYPE_CHECKING: 

19 from typing import Optional, Tuple, List, Dict, ClassVar 

20 from oc_ocdm.graph.graph_entity import GraphEntity 

21 

22from rdflib import Graph, URIRef 

23 

24from oc_ocdm.counter_handler.counter_handler import CounterHandler 

25from oc_ocdm.counter_handler.filesystem_counter_handler import \ 

26 FilesystemCounterHandler 

27from oc_ocdm.counter_handler.in_memory_counter_handler import \ 

28 InMemoryCounterHandler 

29from oc_ocdm.counter_handler.sqlite_counter_handler import SqliteCounterHandler 

30from oc_ocdm.graph.graph_set import GraphSet 

31from oc_ocdm.prov.prov_entity import ProvEntity 

32from oc_ocdm.support.support import (get_count, get_prefix, get_short_name) 

33 

34 

35class ProvSet(AbstractSet[ProvEntity]): 

36 labels: ClassVar[Dict[str, str]] = { 

37 "se": "snapshot of entity metadata" 

38 } 

39 

40 def __init__(self, prov_subj_graph_set: GraphSet, base_iri: str, info_dir: str = "", 

41 wanted_label: bool = True, custom_counter_handler: Optional[CounterHandler] = None, 

42 supplier_prefix: str = "") -> None: 

43 super(ProvSet, self).__init__() 

44 self.prov_g: GraphSet = prov_subj_graph_set 

45 self.res_to_entity: Dict[URIRef, ProvEntity] = {} 

46 self.base_iri: str = base_iri 

47 self.wanted_label: bool = wanted_label 

48 self.info_dir = info_dir 

49 self.supplier_prefix = supplier_prefix 

50 if custom_counter_handler: 

51 self.counter_handler = custom_counter_handler 

52 elif info_dir is not None and info_dir != "": 

53 self.counter_handler = FilesystemCounterHandler(info_dir, supplier_prefix=supplier_prefix) 

54 else: 

55 self.counter_handler = InMemoryCounterHandler() 

56 

57 def get_entity(self, res: URIRef) -> Optional[ProvEntity]: 

58 if res in self.res_to_entity: 

59 return self.res_to_entity[res] 

60 

61 def add_se(self, prov_subject: GraphEntity, res: Optional[URIRef] = None) -> SnapshotEntity: 

62 if res is not None and get_short_name(res) != "se": 

63 raise ValueError(f"Given res: <{res}> is inappropriate for a SnapshotEntity entity.") 

64 if res is not None and res in self.res_to_entity: 

65 entity = self.res_to_entity[res] 

66 assert isinstance(entity, SnapshotEntity) 

67 return entity 

68 g_prov: str = str(prov_subject) + "/prov/" 

69 supplier_prefix = get_prefix(prov_subject.res) 

70 cur_g, count, label = self._add_prov(g_prov, "se", prov_subject, res, supplier_prefix) 

71 return SnapshotEntity(prov_subject, cur_g, self, 

72 res, prov_subject.resp_agent, prov_subject.source, count, label) 

73 

74 def _create_snapshot(self, cur_subj: GraphEntity, cur_time: str) -> SnapshotEntity: 

75 new_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj) 

76 new_snapshot.is_snapshot_of(cur_subj) 

77 new_snapshot.has_generation_time(cur_time) 

78 if cur_subj.source is not None: 

79 new_snapshot.has_primary_source(URIRef(cur_subj.source)) 

80 if cur_subj.resp_agent is not None: 

81 new_snapshot.has_resp_agent(URIRef(cur_subj.resp_agent)) 

82 return new_snapshot 

83 

84 def _get_snapshots_from_merge_list(self, cur_subj: GraphEntity) -> List[SnapshotEntity]: 

85 snapshots_list: List[SnapshotEntity] = [] 

86 for entity in cur_subj.merge_list: 

87 last_entity_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(entity.res) 

88 if last_entity_snapshot_res is not None: 

89 snapshots_list.append(self.add_se(prov_subject=entity, res=last_entity_snapshot_res)) 

90 return snapshots_list 

91 

92 @staticmethod 

93 def _get_merge_description(cur_subj: GraphEntity, snapshots_list: List[SnapshotEntity]) -> str: 

94 merge_description: str = f"The entity '{cur_subj.res}' has been merged" 

95 is_first: bool = True 

96 for snapshot in snapshots_list: 

97 if is_first: 

98 merge_description += f" with '{snapshot.prov_subject.res}'" 

99 is_first = False 

100 else: 

101 merge_description += f", '{snapshot.prov_subject.res}'" 

102 merge_description += "." 

103 return merge_description 

104 

105 def generate_provenance(self, c_time: Optional[float] = None) -> set: 

106 modified_entities = set() 

107 

108 if c_time is None: 

109 cur_time: str = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat(sep="T") 

110 else: 

111 cur_time: str = datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0).isoformat(sep="T") 

112 

113 # MERGED ENTITIES 

114 for cur_subj in self.prov_g.res_to_entity.values(): 

115 if cur_subj is None or (not cur_subj.was_merged or cur_subj.to_be_deleted): 

116 # Here we must skip every entity that was not merged or that must be deleted. 

117 continue 

118 

119 # Previous snapshot 

120 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(cur_subj.res) 

121 if last_snapshot_res is None: 

122 # CREATION SNAPSHOT 

123 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

124 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been created.") 

125 modified_entities.add(cur_subj.res) 

126 else: 

127 update_queries, _, _ = get_update_query(cur_subj, entity_type="graph") 

128 was_modified: bool = len(update_queries) > 0 

129 update_query: str = " ; ".join(update_queries) if update_queries else "" 

130 snapshots_list: List[SnapshotEntity] = self._get_snapshots_from_merge_list(cur_subj) 

131 if was_modified and len(snapshots_list) <= 0: 

132 # MODIFICATION SNAPSHOT 

133 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

134 last_snapshot.has_invalidation_time(cur_time) 

135 

136 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

137 cur_snapshot.derives_from(last_snapshot) 

138 cur_snapshot.has_update_action(update_query) 

139 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been modified.") 

140 modified_entities.add(cur_subj.res) 

141 elif len(snapshots_list) > 0: 

142 # MERGE SNAPSHOT 

143 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

144 last_snapshot.has_invalidation_time(cur_time) 

145 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

146 cur_snapshot.derives_from(last_snapshot) 

147 for snapshot in snapshots_list: 

148 cur_snapshot.derives_from(snapshot) 

149 if update_query: 

150 cur_snapshot.has_update_action(update_query) 

151 cur_snapshot.has_description(self._get_merge_description(cur_subj, snapshots_list)) 

152 modified_entities.add(cur_subj.res) 

153 

154 # EVERY OTHER ENTITY 

155 for cur_subj in self.prov_g.res_to_entity.values(): 

156 if cur_subj is None or (cur_subj.was_merged and not cur_subj.to_be_deleted): 

157 # Here we must skip every entity which was merged while not being marked as to be deleted, 

158 # since we already processed those entities in the previous loop. 

159 continue 

160 

161 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(cur_subj.res) 

162 if last_snapshot_res is None: 

163 if cur_subj.to_be_deleted: 

164 # We can ignore this entity because it was deleted even before being created. 

165 pass 

166 else: 

167 # CREATION SNAPSHOT 

168 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

169 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been created.") 

170 modified_entities.add(cur_subj.res) 

171 else: 

172 update_queries, _, _ = get_update_query(cur_subj, entity_type="graph") 

173 was_modified: bool = len(update_queries) > 0 

174 update_query: str = " ; ".join(update_queries) if update_queries else "" 

175 if cur_subj.to_be_deleted: 

176 # DELETION SNAPSHOT 

177 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

178 last_snapshot.has_invalidation_time(cur_time) 

179 

180 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

181 cur_snapshot.derives_from(last_snapshot) 

182 cur_snapshot.has_invalidation_time(cur_time) 

183 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been deleted.") 

184 cur_snapshot.has_update_action(update_query) 

185 modified_entities.add(cur_subj.res) 

186 elif cur_subj.is_restored: 

187 # RESTORATION SNAPSHOT 

188 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

189 # Don't set invalidation time on previous snapshot for restorations 

190 

191 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

192 cur_snapshot.derives_from(last_snapshot) 

193 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been restored.") 

194 if update_query: 

195 cur_snapshot.has_update_action(update_query) 

196 modified_entities.add(cur_subj.res) 

197 elif was_modified: 

198 # MODIFICATION SNAPSHOT 

199 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res) 

200 last_snapshot.has_invalidation_time(cur_time) 

201 

202 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time) 

203 cur_snapshot.derives_from(last_snapshot) 

204 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been modified.") 

205 cur_snapshot.has_update_action(update_query) 

206 modified_entities.add(cur_subj.res) 

207 return modified_entities 

208 

209 def _add_prov(self, graph_url: str, short_name: str, prov_subject: GraphEntity, 

210 res: Optional[URIRef] = None, supplier_prefix: str = "") -> Tuple[Graph, Optional[str], Optional[str]]: 

211 cur_g: Graph = Graph(identifier=graph_url) 

212 self._set_ns(cur_g) 

213 

214 count: Optional[str] = None 

215 label: Optional[str] = None 

216 

217 if res is not None: 

218 try: 

219 res_count: int = int(get_count(res)) 

220 except ValueError: 

221 res_count: int = -1 

222 

223 if isinstance(self.counter_handler, SqliteCounterHandler): 

224 cur_count: int = self.counter_handler.read_counter(str(prov_subject)) 

225 else: 

226 cur_count: int = self.counter_handler.read_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix) 

227 

228 if res_count > cur_count: 

229 if isinstance(self.counter_handler, SqliteCounterHandler): 

230 self.counter_handler.set_counter(int(get_count(prov_subject.res)), str(prov_subject)) 

231 else: 

232 self.counter_handler.set_counter(res_count, prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix) 

233 return cur_g, count, label 

234 

235 if isinstance(self.counter_handler, SqliteCounterHandler): 

236 count = str(self.counter_handler.increment_counter(str(prov_subject))) 

237 else: 

238 count = str(self.counter_handler.increment_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix)) 

239 

240 if self.wanted_label: 

241 cur_short_name = prov_subject.short_name 

242 cur_entity_count = get_count(prov_subject.res) 

243 cur_entity_prefix = get_prefix(prov_subject.res) 

244 

245 related_to_label = "related to %s %s%s" % (GraphSet.labels[cur_short_name], cur_entity_prefix, 

246 cur_entity_count) 

247 related_to_short_label = "-> %s/%s%s" % (cur_short_name, cur_entity_prefix, cur_entity_count) 

248 

249 label = "%s %s %s [%s/%s %s]" % (self.labels[short_name], count, related_to_label, short_name, count, 

250 related_to_short_label) 

251 

252 return cur_g, count, label 

253 

254 @staticmethod 

255 def _set_ns(g: Graph) -> None: 

256 g.namespace_manager.bind("prov", ProvEntity.PROV) 

257 

258 def _retrieve_last_snapshot(self, prov_subject: URIRef) -> Optional[URIRef]: 

259 subj_short_name: str = get_short_name(prov_subject) 

260 try: 

261 subj_count: str = get_count(prov_subject) 

262 if int(subj_count) <= 0: 

263 raise ValueError('prov_subject is not a valid URIRef. Extracted count value should be a positive ' 

264 'non-zero integer number!') 

265 except ValueError: 

266 raise ValueError('prov_subject is not a valid URIRef. Unable to extract the count value!') 

267 

268 supplier_prefix = get_prefix(prov_subject) 

269 

270 if isinstance(self.counter_handler, SqliteCounterHandler): 

271 last_snapshot_count: str = str(self.counter_handler.read_counter(str(prov_subject))) 

272 else: 

273 last_snapshot_count: str = str(self.counter_handler.read_counter(subj_short_name, "se", int(subj_count), supplier_prefix=supplier_prefix)) 

274 

275 if int(last_snapshot_count) <= 0: 

276 return None 

277 else: 

278 return URIRef(str(prov_subject) + '/prov/se/' + last_snapshot_count) 

279 

280 def get_se(self) -> Tuple[SnapshotEntity, ...]: 

281 return tuple(entity for entity in self.res_to_entity.values() if isinstance(entity, SnapshotEntity))