Coverage for oc_ocdm/prov/prov_set.py: 91%
199 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-30 22:05 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2016, Silvio Peroni <essepuntato@gmail.com>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
16from __future__ import annotations
18import os
19from datetime import datetime, timezone
20from typing import TYPE_CHECKING
22from oc_ocdm.abstract_set import AbstractSet
23from oc_ocdm.prov.entities.snapshot_entity import SnapshotEntity
24from oc_ocdm.support.query_utils import get_update_query
26if TYPE_CHECKING:
27 from typing import Optional, Tuple, List, Dict, ClassVar
28 from oc_ocdm.graph.graph_entity import GraphEntity
30from rdflib import Graph, URIRef
32from oc_ocdm.counter_handler.counter_handler import CounterHandler
33from oc_ocdm.counter_handler.filesystem_counter_handler import \
34 FilesystemCounterHandler
35from oc_ocdm.counter_handler.in_memory_counter_handler import \
36 InMemoryCounterHandler
37from oc_ocdm.counter_handler.sqlite_counter_handler import SqliteCounterHandler
38from oc_ocdm.graph.graph_set import GraphSet
39from oc_ocdm.prov.prov_entity import ProvEntity
40from oc_ocdm.support.support import (get_count, get_prefix, get_short_name)
43class ProvSet(AbstractSet):
44 # Labels
45 labels: ClassVar[Dict[str, str]] = {
46 "se": "snapshot of entity metadata"
47 }
49 def __init__(self, prov_subj_graph_set: GraphSet, base_iri: str, info_dir: str = "",
50 wanted_label: bool = True, custom_counter_handler: CounterHandler = None,
51 supplier_prefix: str = "") -> None:
52 super(ProvSet, self).__init__()
53 self.prov_g: GraphSet = prov_subj_graph_set
54 # The following variable maps a URIRef with the related provenance entity
55 self.res_to_entity: Dict[URIRef, ProvEntity] = {}
56 self.base_iri: str = base_iri
57 self.wanted_label: bool = wanted_label
58 self.info_dir = info_dir
59 self.supplier_prefix = supplier_prefix
60 if custom_counter_handler:
61 self.counter_handler = custom_counter_handler
62 elif info_dir is not None and info_dir != "":
63 self.counter_handler = FilesystemCounterHandler(info_dir, supplier_prefix=supplier_prefix)
64 else:
65 self.counter_handler = InMemoryCounterHandler()
67 def get_entity(self, res: URIRef) -> Optional[ProvEntity]:
68 if res in self.res_to_entity:
69 return self.res_to_entity[res]
71 def add_se(self, prov_subject: GraphEntity, res: URIRef = None) -> SnapshotEntity:
72 if res is not None and get_short_name(res) != "se":
73 raise ValueError(f"Given res: <{res}> is inappropriate for a SnapshotEntity entity.")
74 if res is not None and res in self.res_to_entity:
75 return self.res_to_entity[res]
76 g_prov: str = str(prov_subject) + "/prov/"
77 supplier_prefix = get_prefix(str(prov_subject.res))
78 cur_g, count, label = self._add_prov(g_prov, "se", prov_subject, res, supplier_prefix)
79 return SnapshotEntity(prov_subject, cur_g, self, res, prov_subject.resp_agent,
80 prov_subject.source, ProvEntity.iri_entity, count, label, "se")
82 def _create_snapshot(self, cur_subj: GraphEntity, cur_time: str) -> SnapshotEntity:
83 new_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj)
84 new_snapshot.is_snapshot_of(cur_subj)
85 new_snapshot.has_generation_time(cur_time)
86 if cur_subj.source is not None:
87 new_snapshot.has_primary_source(URIRef(cur_subj.source))
88 if cur_subj.resp_agent is not None:
89 new_snapshot.has_resp_agent(URIRef(cur_subj.resp_agent))
90 return new_snapshot
92 def _get_snapshots_from_merge_list(self, cur_subj: GraphEntity) -> List[SnapshotEntity]:
93 snapshots_list: List[SnapshotEntity] = []
94 for entity in cur_subj.merge_list:
95 last_entity_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(entity.res)
96 if last_entity_snapshot_res is not None:
97 snapshots_list.append(self.add_se(prov_subject=entity, res=last_entity_snapshot_res))
98 return snapshots_list
100 @staticmethod
101 def _get_merge_description(cur_subj: GraphEntity, snapshots_list: List[SnapshotEntity]) -> str:
102 merge_description: str = f"The entity '{cur_subj.res}' has been merged"
103 is_first: bool = True
104 for snapshot in snapshots_list:
105 if is_first:
106 merge_description += f" with '{snapshot.prov_subject.res}'"
107 is_first = False
108 else:
109 merge_description += f", '{snapshot.prov_subject.res}'"
110 merge_description += "."
111 return merge_description
113 def generate_provenance(self, c_time: float = None) -> set:
114 modified_entities = set()
116 if c_time is None:
117 cur_time: str = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat(sep="T")
118 else:
119 cur_time: str = datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0).isoformat(sep="T")
121 # MERGED ENTITIES
122 for cur_subj in self.prov_g.res_to_entity.values():
123 if cur_subj is None or (not cur_subj.was_merged or cur_subj.to_be_deleted):
124 # Here we must skip every entity that was not merged or that must be deleted.
125 continue
127 # Previous snapshot
128 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(cur_subj.res)
129 if last_snapshot_res is None:
130 # CREATION SNAPSHOT
131 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
132 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been created.")
133 modified_entities.add(cur_subj.res)
134 else:
135 update_query: str = get_update_query(cur_subj, entity_type="graph")[0]
136 was_modified: bool = (update_query != "")
137 snapshots_list: List[SnapshotEntity] = self._get_snapshots_from_merge_list(cur_subj)
138 if was_modified and len(snapshots_list) <= 0:
139 # MODIFICATION SNAPSHOT
140 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
141 last_snapshot.has_invalidation_time(cur_time)
143 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
144 cur_snapshot.derives_from(last_snapshot)
145 cur_snapshot.has_update_action(update_query)
146 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been modified.")
147 modified_entities.add(cur_subj.res)
148 elif len(snapshots_list) > 0:
149 # MERGE SNAPSHOT
150 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
151 last_snapshot.has_invalidation_time(cur_time)
152 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
153 cur_snapshot.derives_from(last_snapshot)
154 for snapshot in snapshots_list:
155 cur_snapshot.derives_from(snapshot)
156 if update_query:
157 cur_snapshot.has_update_action(update_query)
158 cur_snapshot.has_description(self._get_merge_description(cur_subj, snapshots_list))
159 modified_entities.add(cur_subj.res)
161 # EVERY OTHER ENTITY
162 for cur_subj in self.prov_g.res_to_entity.values():
163 if cur_subj is None or (cur_subj.was_merged and not cur_subj.to_be_deleted):
164 # Here we must skip every entity which was merged while not being marked as to be deleted,
165 # since we already processed those entities in the previous loop.
166 continue
168 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(cur_subj.res)
169 if last_snapshot_res is None:
170 if cur_subj.to_be_deleted:
171 # We can ignore this entity because it was deleted even before being created.
172 pass
173 else:
174 # CREATION SNAPSHOT
175 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
176 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been created.")
177 modified_entities.add(cur_subj.res)
178 else:
179 update_query: str = get_update_query(cur_subj, entity_type="graph")[0]
180 was_modified: bool = (update_query != "")
181 if cur_subj.to_be_deleted:
182 # DELETION SNAPSHOT
183 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
184 last_snapshot.has_invalidation_time(cur_time)
186 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
187 cur_snapshot.derives_from(last_snapshot)
188 cur_snapshot.has_invalidation_time(cur_time)
189 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been deleted.")
190 cur_snapshot.has_update_action(update_query)
191 modified_entities.add(cur_subj.res)
192 elif cur_subj.is_restored:
193 # RESTORATION SNAPSHOT
194 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
195 # Don't set invalidation time on previous snapshot for restorations
197 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
198 cur_snapshot.derives_from(last_snapshot)
199 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been restored.")
200 if update_query:
201 cur_snapshot.has_update_action(update_query)
202 modified_entities.add(cur_subj.res)
203 elif was_modified:
204 # MODIFICATION SNAPSHOT
205 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
206 last_snapshot.has_invalidation_time(cur_time)
208 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
209 cur_snapshot.derives_from(last_snapshot)
210 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been modified.")
211 cur_snapshot.has_update_action(update_query)
212 modified_entities.add(cur_subj.res)
213 return modified_entities
215 def _add_prov(self, graph_url: str, short_name: str, prov_subject: GraphEntity,
216 res: URIRef = None, supplier_prefix: str = "") -> Tuple[Graph, Optional[str], Optional[str]]:
217 cur_g: Graph = Graph(identifier=graph_url)
218 self._set_ns(cur_g)
220 count: Optional[str] = None
221 label: Optional[str] = None
223 if res is not None:
224 try:
225 res_count: int = int(get_count(res))
226 except ValueError:
227 res_count: int = -1
229 if isinstance(self.counter_handler, SqliteCounterHandler):
230 cur_count: int = self.counter_handler.read_counter(prov_subject)
231 else:
232 cur_count: int = self.counter_handler.read_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix)
234 if res_count > cur_count:
235 if isinstance(self.counter_handler, SqliteCounterHandler):
236 self.counter_handler.set_counter(int(get_count(prov_subject.res)), prov_subject)
237 else:
238 self.counter_handler.set_counter(res_count, prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix)
239 return cur_g, count, label
241 if isinstance(self.counter_handler, SqliteCounterHandler):
242 count = str(self.counter_handler.increment_counter(prov_subject))
243 else:
244 count = str(self.counter_handler.increment_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix))
246 if self.wanted_label:
247 cur_short_name = prov_subject.short_name
248 cur_entity_count = get_count(prov_subject.res)
249 cur_entity_prefix = get_prefix(prov_subject.res)
251 related_to_label = "related to %s %s%s" % (GraphSet.labels[cur_short_name], cur_entity_prefix,
252 cur_entity_count)
253 related_to_short_label = "-> %s/%s%s" % (cur_short_name, cur_entity_prefix, cur_entity_count)
255 label = "%s %s %s [%s/%s %s]" % (self.labels[short_name], count, related_to_label, short_name, count,
256 related_to_short_label)
258 return cur_g, count, label
260 @staticmethod
261 def _set_ns(g: Graph) -> None:
262 g.namespace_manager.bind("prov", ProvEntity.PROV)
264 def _retrieve_last_snapshot(self, prov_subject: URIRef) -> Optional[URIRef]:
265 subj_short_name: str = get_short_name(prov_subject)
266 try:
267 subj_count: str = get_count(prov_subject)
268 if int(subj_count) <= 0:
269 raise ValueError('prov_subject is not a valid URIRef. Extracted count value should be a positive '
270 'non-zero integer number!')
271 except ValueError:
272 raise ValueError('prov_subject is not a valid URIRef. Unable to extract the count value!')
274 supplier_prefix = get_prefix(str(prov_subject))
276 if isinstance(self.counter_handler, SqliteCounterHandler):
277 last_snapshot_count: str = str(self.counter_handler.read_counter(prov_subject))
278 else:
279 last_snapshot_count: str = str(self.counter_handler.read_counter(subj_short_name, "se", int(subj_count), supplier_prefix=supplier_prefix))
281 if int(last_snapshot_count) <= 0:
282 return None
283 else:
284 return URIRef(str(prov_subject) + '/prov/se/' + last_snapshot_count)
286 def get_se(self) -> Tuple[SnapshotEntity]:
287 result: Tuple[SnapshotEntity] = tuple()
288 for ref in self.res_to_entity:
289 entity: ProvEntity = self.res_to_entity[ref]
290 if isinstance(entity, SnapshotEntity):
291 result += (entity, )
292 return result