Coverage for oc_ocdm / prov / prov_set.py: 95%
195 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-28 18:52 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2022-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7# -*- coding: utf-8 -*-
8from __future__ import annotations
10import os
11from datetime import datetime, timezone
12from typing import TYPE_CHECKING
14from oc_ocdm.abstract_set import AbstractSet
15from oc_ocdm.prov.entities.snapshot_entity import SnapshotEntity
16from oc_ocdm.support.query_utils import get_update_query
18if TYPE_CHECKING:
19 from typing import Optional, Tuple, List, Dict, ClassVar
20 from oc_ocdm.graph.graph_entity import GraphEntity
22from rdflib import Graph, URIRef
24from oc_ocdm.counter_handler.counter_handler import CounterHandler
25from oc_ocdm.counter_handler.filesystem_counter_handler import \
26 FilesystemCounterHandler
27from oc_ocdm.counter_handler.in_memory_counter_handler import \
28 InMemoryCounterHandler
29from oc_ocdm.counter_handler.sqlite_counter_handler import SqliteCounterHandler
30from oc_ocdm.graph.graph_set import GraphSet
31from oc_ocdm.prov.prov_entity import ProvEntity
32from oc_ocdm.support.support import (get_count, get_prefix, get_short_name)
35class ProvSet(AbstractSet[ProvEntity]):
36 labels: ClassVar[Dict[str, str]] = {
37 "se": "snapshot of entity metadata"
38 }
40 def __init__(self, prov_subj_graph_set: GraphSet, base_iri: str, info_dir: str = "",
41 wanted_label: bool = True, custom_counter_handler: Optional[CounterHandler] = None,
42 supplier_prefix: str = "") -> None:
43 super(ProvSet, self).__init__()
44 self.prov_g: GraphSet = prov_subj_graph_set
45 self.res_to_entity: Dict[URIRef, ProvEntity] = {}
46 self.base_iri: str = base_iri
47 self.wanted_label: bool = wanted_label
48 self.info_dir = info_dir
49 self.supplier_prefix = supplier_prefix
50 if custom_counter_handler:
51 self.counter_handler = custom_counter_handler
52 elif info_dir is not None and info_dir != "":
53 self.counter_handler = FilesystemCounterHandler(info_dir, supplier_prefix=supplier_prefix)
54 else:
55 self.counter_handler = InMemoryCounterHandler()
57 def get_entity(self, res: URIRef) -> Optional[ProvEntity]:
58 if res in self.res_to_entity:
59 return self.res_to_entity[res]
61 def add_se(self, prov_subject: GraphEntity, res: Optional[URIRef] = None) -> SnapshotEntity:
62 if res is not None and get_short_name(res) != "se":
63 raise ValueError(f"Given res: <{res}> is inappropriate for a SnapshotEntity entity.")
64 if res is not None and res in self.res_to_entity:
65 entity = self.res_to_entity[res]
66 assert isinstance(entity, SnapshotEntity)
67 return entity
68 g_prov: str = str(prov_subject) + "/prov/"
69 supplier_prefix = get_prefix(prov_subject.res)
70 cur_g, count, label = self._add_prov(g_prov, "se", prov_subject, res, supplier_prefix)
71 return SnapshotEntity(prov_subject, cur_g, self,
72 res, prov_subject.resp_agent, prov_subject.source, count, label)
74 def _create_snapshot(self, cur_subj: GraphEntity, cur_time: str) -> SnapshotEntity:
75 new_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj)
76 new_snapshot.is_snapshot_of(cur_subj)
77 new_snapshot.has_generation_time(cur_time)
78 if cur_subj.source is not None:
79 new_snapshot.has_primary_source(URIRef(cur_subj.source))
80 if cur_subj.resp_agent is not None:
81 new_snapshot.has_resp_agent(URIRef(cur_subj.resp_agent))
82 return new_snapshot
84 def _get_snapshots_from_merge_list(self, cur_subj: GraphEntity) -> List[SnapshotEntity]:
85 snapshots_list: List[SnapshotEntity] = []
86 for entity in cur_subj.merge_list:
87 last_entity_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(entity.res)
88 if last_entity_snapshot_res is not None:
89 snapshots_list.append(self.add_se(prov_subject=entity, res=last_entity_snapshot_res))
90 return snapshots_list
92 @staticmethod
93 def _get_merge_description(cur_subj: GraphEntity, snapshots_list: List[SnapshotEntity]) -> str:
94 merge_description: str = f"The entity '{cur_subj.res}' has been merged"
95 is_first: bool = True
96 for snapshot in snapshots_list:
97 if is_first:
98 merge_description += f" with '{snapshot.prov_subject.res}'"
99 is_first = False
100 else:
101 merge_description += f", '{snapshot.prov_subject.res}'"
102 merge_description += "."
103 return merge_description
105 def generate_provenance(self, c_time: Optional[float] = None) -> set:
106 modified_entities = set()
108 if c_time is None:
109 cur_time: str = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat(sep="T")
110 else:
111 cur_time: str = datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0).isoformat(sep="T")
113 # MERGED ENTITIES
114 for cur_subj in self.prov_g.res_to_entity.values():
115 if cur_subj is None or (not cur_subj.was_merged or cur_subj.to_be_deleted):
116 # Here we must skip every entity that was not merged or that must be deleted.
117 continue
119 # Previous snapshot
120 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(cur_subj.res)
121 if last_snapshot_res is None:
122 # CREATION SNAPSHOT
123 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
124 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been created.")
125 modified_entities.add(cur_subj.res)
126 else:
127 update_queries, _, _ = get_update_query(cur_subj, entity_type="graph")
128 was_modified: bool = len(update_queries) > 0
129 update_query: str = " ; ".join(update_queries) if update_queries else ""
130 snapshots_list: List[SnapshotEntity] = self._get_snapshots_from_merge_list(cur_subj)
131 if was_modified and len(snapshots_list) <= 0:
132 # MODIFICATION SNAPSHOT
133 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
134 last_snapshot.has_invalidation_time(cur_time)
136 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
137 cur_snapshot.derives_from(last_snapshot)
138 cur_snapshot.has_update_action(update_query)
139 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been modified.")
140 modified_entities.add(cur_subj.res)
141 elif len(snapshots_list) > 0:
142 # MERGE SNAPSHOT
143 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
144 last_snapshot.has_invalidation_time(cur_time)
145 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
146 cur_snapshot.derives_from(last_snapshot)
147 for snapshot in snapshots_list:
148 cur_snapshot.derives_from(snapshot)
149 if update_query:
150 cur_snapshot.has_update_action(update_query)
151 cur_snapshot.has_description(self._get_merge_description(cur_subj, snapshots_list))
152 modified_entities.add(cur_subj.res)
154 # EVERY OTHER ENTITY
155 for cur_subj in self.prov_g.res_to_entity.values():
156 if cur_subj is None or (cur_subj.was_merged and not cur_subj.to_be_deleted):
157 # Here we must skip every entity which was merged while not being marked as to be deleted,
158 # since we already processed those entities in the previous loop.
159 continue
161 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(cur_subj.res)
162 if last_snapshot_res is None:
163 if cur_subj.to_be_deleted:
164 # We can ignore this entity because it was deleted even before being created.
165 pass
166 else:
167 # CREATION SNAPSHOT
168 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
169 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been created.")
170 modified_entities.add(cur_subj.res)
171 else:
172 update_queries, _, _ = get_update_query(cur_subj, entity_type="graph")
173 was_modified: bool = len(update_queries) > 0
174 update_query: str = " ; ".join(update_queries) if update_queries else ""
175 if cur_subj.to_be_deleted:
176 # DELETION SNAPSHOT
177 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
178 last_snapshot.has_invalidation_time(cur_time)
180 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
181 cur_snapshot.derives_from(last_snapshot)
182 cur_snapshot.has_invalidation_time(cur_time)
183 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been deleted.")
184 cur_snapshot.has_update_action(update_query)
185 modified_entities.add(cur_subj.res)
186 elif cur_subj.is_restored:
187 # RESTORATION SNAPSHOT
188 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
189 # Don't set invalidation time on previous snapshot for restorations
191 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
192 cur_snapshot.derives_from(last_snapshot)
193 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been restored.")
194 if update_query:
195 cur_snapshot.has_update_action(update_query)
196 modified_entities.add(cur_subj.res)
197 elif was_modified:
198 # MODIFICATION SNAPSHOT
199 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
200 last_snapshot.has_invalidation_time(cur_time)
202 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
203 cur_snapshot.derives_from(last_snapshot)
204 cur_snapshot.has_description(f"The entity '{cur_subj.res}' has been modified.")
205 cur_snapshot.has_update_action(update_query)
206 modified_entities.add(cur_subj.res)
207 return modified_entities
209 def _add_prov(self, graph_url: str, short_name: str, prov_subject: GraphEntity,
210 res: Optional[URIRef] = None, supplier_prefix: str = "") -> Tuple[Graph, Optional[str], Optional[str]]:
211 cur_g: Graph = Graph(identifier=graph_url)
212 self._set_ns(cur_g)
214 count: Optional[str] = None
215 label: Optional[str] = None
217 if res is not None:
218 try:
219 res_count: int = int(get_count(res))
220 except ValueError:
221 res_count: int = -1
223 if isinstance(self.counter_handler, SqliteCounterHandler):
224 cur_count: int = self.counter_handler.read_counter(str(prov_subject))
225 else:
226 cur_count: int = self.counter_handler.read_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix)
228 if res_count > cur_count:
229 if isinstance(self.counter_handler, SqliteCounterHandler):
230 self.counter_handler.set_counter(int(get_count(prov_subject.res)), str(prov_subject))
231 else:
232 self.counter_handler.set_counter(res_count, prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix)
233 return cur_g, count, label
235 if isinstance(self.counter_handler, SqliteCounterHandler):
236 count = str(self.counter_handler.increment_counter(str(prov_subject)))
237 else:
238 count = str(self.counter_handler.increment_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix))
240 if self.wanted_label:
241 cur_short_name = prov_subject.short_name
242 cur_entity_count = get_count(prov_subject.res)
243 cur_entity_prefix = get_prefix(prov_subject.res)
245 related_to_label = "related to %s %s%s" % (GraphSet.labels[cur_short_name], cur_entity_prefix,
246 cur_entity_count)
247 related_to_short_label = "-> %s/%s%s" % (cur_short_name, cur_entity_prefix, cur_entity_count)
249 label = "%s %s %s [%s/%s %s]" % (self.labels[short_name], count, related_to_label, short_name, count,
250 related_to_short_label)
252 return cur_g, count, label
254 @staticmethod
255 def _set_ns(g: Graph) -> None:
256 g.namespace_manager.bind("prov", ProvEntity.PROV)
258 def _retrieve_last_snapshot(self, prov_subject: URIRef) -> Optional[URIRef]:
259 subj_short_name: str = get_short_name(prov_subject)
260 try:
261 subj_count: str = get_count(prov_subject)
262 if int(subj_count) <= 0:
263 raise ValueError('prov_subject is not a valid URIRef. Extracted count value should be a positive '
264 'non-zero integer number!')
265 except ValueError:
266 raise ValueError('prov_subject is not a valid URIRef. Unable to extract the count value!')
268 supplier_prefix = get_prefix(prov_subject)
270 if isinstance(self.counter_handler, SqliteCounterHandler):
271 last_snapshot_count: str = str(self.counter_handler.read_counter(str(prov_subject)))
272 else:
273 last_snapshot_count: str = str(self.counter_handler.read_counter(subj_short_name, "se", int(subj_count), supplier_prefix=supplier_prefix))
275 if int(last_snapshot_count) <= 0:
276 return None
277 else:
278 return URIRef(str(prov_subject) + '/prov/se/' + last_snapshot_count)
280 def get_se(self) -> Tuple[SnapshotEntity, ...]:
281 return tuple(entity for entity in self.res_to_entity.values() if isinstance(entity, SnapshotEntity))