Coverage for rdflib_ocdm / prov / provenance.py: 94%
119 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 12:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 12:35 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2023-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7from __future__ import annotations
9from typing import TYPE_CHECKING
11if TYPE_CHECKING:
12 from rdflib_ocdm.ocdm_graph import OCDMGraphCommons
13 from typing import List, Dict, Optional
15from collections import OrderedDict
16from datetime import datetime, timezone
18from rdflib import Dataset, URIRef
20from rdflib_ocdm.counter_handler.counter_handler import CounterHandler
21from rdflib_ocdm.counter_handler.in_memory_counter_handler import \
22 InMemoryCounterHandler
23from rdflib_ocdm.prov.prov_entity import ProvEntity
24from rdflib_ocdm.prov.snapshot_entity import SnapshotEntity
25from rdflib_ocdm.query_utils import get_update_query
26from rdflib_ocdm.support import get_prov_count
29class OCDMProvenance(Dataset):
30 def __init__(self, prov_subj_graph: OCDMGraphCommons, counter_handler: CounterHandler = None):
31 Dataset.__init__(self)
32 self.prov_g = prov_subj_graph
33 # The following variable maps a URIRef with the related provenance entity
34 self.res_to_entity: Dict[URIRef, ProvEntity] = dict()
35 self.all_entities = set()
36 if counter_handler is None:
37 counter_handler = InMemoryCounterHandler()
38 self.counter_handler = counter_handler
40 def generate_provenance(self, c_time: float = None) -> None:
41 if c_time is None:
42 cur_time: str = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat(sep="T")
43 else:
44 cur_time: str = datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0).isoformat(sep="T")
45 merge_index = self.prov_g.merge_index
46 prov_g_subjects = OrderedDict(sorted(self.prov_g.entity_index.items(), key=lambda x: not x[1]['to_be_deleted'], reverse=True))
47 for cur_subj, cur_subj_metadata in prov_g_subjects.items():
48 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(str(cur_subj))
49 if cur_subj_metadata['to_be_deleted']:
50 update_query: str = get_update_query(self.prov_g, cur_subj)[0]
51 # DELETION SNAPSHOT
52 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
53 last_snapshot.has_invalidation_time(cur_time)
55 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
56 cur_snapshot.derives_from(last_snapshot)
57 cur_snapshot.has_invalidation_time(cur_time)
58 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been deleted.")
59 cur_snapshot.has_update_action(update_query)
60 elif cur_subj_metadata['is_restored']:
61 # RESTORATION SNAPSHOT
62 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
63 # Non settiamo l'invalidation time per il precedente snapshot in caso di restore
65 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
66 cur_snapshot.derives_from(last_snapshot)
67 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been restored.")
69 update_query: str = get_update_query(self.prov_g, cur_subj)[0]
70 if update_query: 70 ↛ 47line 70 didn't jump to line 47 because the condition on line 70 was always true
71 cur_snapshot.has_update_action(update_query)
72 else:
73 if last_snapshot_res is None:
74 # CREATION SNAPSHOT
75 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
76 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been created.")
77 else:
78 update_query = get_update_query(self.prov_g, cur_subj)[0]
79 cur_subj_merge_index = {k: v for k, v in merge_index.items() if k == cur_subj}
80 snapshots_list = self._get_snapshots_from_merge_list(cur_subj_merge_index)
81 if update_query and len(snapshots_list) == 0:
82 # MODIFICATION SNAPSHOT
83 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
84 last_snapshot.has_invalidation_time(cur_time)
85 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
86 cur_snapshot.derives_from(last_snapshot)
87 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' was modified.")
88 cur_snapshot.has_update_action(update_query)
89 elif len(snapshots_list) > 0:
90 # MERGE SNAPSHOT
91 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
92 last_snapshot.has_invalidation_time(cur_time)
93 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
94 cur_snapshot.derives_from(last_snapshot)
95 for snapshot in snapshots_list:
96 cur_snapshot.derives_from(snapshot)
97 if update_query: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 cur_snapshot.has_update_action(update_query)
99 cur_snapshot.has_description(self._get_merge_description(cur_subj, snapshots_list))
101 @staticmethod
102 def _get_merge_description(cur_subj: URIRef, snapshots_list: List[SnapshotEntity]) -> str:
103 merge_description: str = f"The entity '{str(cur_subj)}' was merged"
104 is_first: bool = True
105 for snapshot in snapshots_list:
106 if is_first: 106 ↛ 110line 106 didn't jump to line 110 because the condition on line 106 was always true
107 merge_description += f" with '{snapshot.prov_subject}'"
108 is_first = False
109 else:
110 merge_description += f", '{snapshot.prov_subject}'"
111 merge_description += "."
112 return merge_description
114 def _retrieve_last_snapshot(self, prov_subject: URIRef) -> Optional[URIRef]:
115 last_snapshot_count: str = str(self.counter_handler.read_counter(str(prov_subject)))
116 if int(last_snapshot_count) <= 0:
117 return None
118 else:
119 return URIRef(str(prov_subject) + '/prov/se/' + last_snapshot_count)
121 def _create_snapshot(self, cur_subj: URIRef, cur_time: str) -> SnapshotEntity:
122 new_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj)
123 new_snapshot.is_snapshot_of(cur_subj)
124 new_snapshot.has_generation_time(cur_time)
125 source = self.prov_g.entity_index[cur_subj]['source']
126 resp_agent = self.prov_g.entity_index[cur_subj]['resp_agent']
127 if source is not None:
128 new_snapshot.has_primary_source(URIRef(source))
129 if resp_agent is not None:
130 new_snapshot.has_resp_agent(URIRef(resp_agent))
131 return new_snapshot
133 def _get_snapshots_from_merge_list(self, cur_subj_merge_index: dict) -> List[SnapshotEntity]:
134 snapshots_list: List[SnapshotEntity] = []
135 for _, merge_entities in cur_subj_merge_index.items():
136 for merge_entity in merge_entities:
137 last_entity_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(merge_entity)
138 if last_entity_snapshot_res is not None: 138 ↛ 136line 138 didn't jump to line 136 because the condition on line 138 was always true
139 snapshots_list.append(self.add_se(prov_subject=merge_entity, res=last_entity_snapshot_res))
140 return snapshots_list
142 def add_se(self, prov_subject: URIRef, res: URIRef = None) -> SnapshotEntity:
143 if res is not None and res in self.res_to_entity: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 return self.res_to_entity[res]
145 count = self._add_prov(str(prov_subject), res)
146 se = SnapshotEntity(str(prov_subject), self, count)
147 return se
149 def _add_prov(self, prov_subject: str, res: URIRef) -> Optional[str]:
150 if res is not None:
151 res_count: int = int(get_prov_count(res))
152 if res_count > self.counter_handler.read_counter(prov_subject): 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true
153 self.counter_handler.set_counter(res_count, prov_subject)
154 return str(res_count)
155 return str(self.counter_handler.increment_counter(prov_subject))
157 def get_entity(self, res: str) -> Optional[ProvEntity]:
158 if res in self.res_to_entity:
159 return self.res_to_entity[res]