Coverage for rdflib_ocdm/prov/provenance.py: 92%
122 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-11-01 22:02 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-11-01 22:02 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright 2023 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from __future__ import annotations
19from typing import TYPE_CHECKING
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true
22 from rdflib_ocdm.ocdm_graph import OCDMGraphCommons
23 from typing import List, Dict, Optional
25from collections import OrderedDict
26from datetime import datetime, timezone
28from rdflib import Dataset, URIRef
30from rdflib_ocdm.counter_handler.counter_handler import CounterHandler
31from rdflib_ocdm.counter_handler.in_memory_counter_handler import \
32 InMemoryCounterHandler
33from rdflib_ocdm.prov.prov_entity import ProvEntity
34from rdflib_ocdm.prov.snapshot_entity import SnapshotEntity
35from rdflib_ocdm.query_utils import get_update_query
36from rdflib_ocdm.support import get_prov_count
39class OCDMProvenance(Dataset):
40 def __init__(self, prov_subj_graph: OCDMGraphCommons, counter_handler: CounterHandler = None):
41 Dataset.__init__(self)
42 self.prov_g = prov_subj_graph
43 # The following variable maps a URIRef with the related provenance entity
44 self.res_to_entity: Dict[URIRef, ProvEntity] = dict()
45 self.all_entities = set()
46 if counter_handler is None:
47 counter_handler = InMemoryCounterHandler()
48 self.counter_handler = counter_handler
50 def generate_provenance(self, c_time: float = None) -> None:
51 if c_time is None:
52 cur_time: str = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat(sep="T")
53 else:
54 cur_time: str = datetime.fromtimestamp(c_time, tz=timezone.utc).replace(microsecond=0).isoformat(sep="T")
55 merge_index = self.prov_g.merge_index
56 prov_g_subjects = OrderedDict(sorted(self.prov_g.entity_index.items(), key=lambda x: not x[1]['to_be_deleted'], reverse=True))
57 for cur_subj, cur_subj_metadata in prov_g_subjects.items():
58 last_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(str(cur_subj))
59 if cur_subj_metadata['to_be_deleted']:
60 update_query: str = get_update_query(self.prov_g, cur_subj)[0]
61 # DELETION SNAPSHOT
62 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
63 last_snapshot.has_invalidation_time(cur_time)
65 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
66 cur_snapshot.derives_from(last_snapshot)
67 cur_snapshot.has_invalidation_time(cur_time)
68 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been deleted.")
69 cur_snapshot.has_update_action(update_query)
70 elif cur_subj_metadata['is_restored']:
71 # RESTORATION SNAPSHOT
72 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
73 # Non settiamo l'invalidation time per il precedente snapshot in caso di restore
75 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
76 cur_snapshot.derives_from(last_snapshot)
77 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been restored.")
79 update_query: str = get_update_query(self.prov_g, cur_subj)[0]
80 if update_query: 80 ↛ 57line 80 didn't jump to line 57 because the condition on line 80 was always true
81 cur_snapshot.has_update_action(update_query)
82 else:
83 if last_snapshot_res is None:
84 # CREATION SNAPSHOT
85 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
86 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' has been created.")
87 else:
88 update_query = get_update_query(self.prov_g, cur_subj)[0]
89 cur_subj_merge_index = {k: v for k, v in merge_index.items() if k == cur_subj}
90 snapshots_list = self._get_snapshots_from_merge_list(cur_subj_merge_index)
91 if update_query and len(snapshots_list) == 0:
92 # MODIFICATION SNAPSHOT
93 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
94 last_snapshot.has_invalidation_time(cur_time)
95 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
96 cur_snapshot.derives_from(last_snapshot)
97 cur_snapshot.has_description(f"The entity '{str(cur_subj)}' was modified.")
98 cur_snapshot.has_update_action(update_query)
99 elif len(snapshots_list) > 0:
100 # MERGE SNAPSHOT
101 last_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj, res=last_snapshot_res)
102 last_snapshot.has_invalidation_time(cur_time)
103 cur_snapshot: SnapshotEntity = self._create_snapshot(cur_subj, cur_time)
104 cur_snapshot.derives_from(last_snapshot)
105 for snapshot in snapshots_list:
106 cur_snapshot.derives_from(snapshot)
107 if update_query: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 cur_snapshot.has_update_action(update_query)
109 cur_snapshot.has_description(self._get_merge_description(cur_subj, snapshots_list))
111 @staticmethod
112 def _get_merge_description(cur_subj: URIRef, snapshots_list: List[SnapshotEntity]) -> str:
113 merge_description: str = f"The entity '{str(cur_subj)}' was merged"
114 is_first: bool = True
115 for snapshot in snapshots_list:
116 if is_first: 116 ↛ 120line 116 didn't jump to line 120 because the condition on line 116 was always true
117 merge_description += f" with '{snapshot.prov_subject}'"
118 is_first = False
119 else:
120 merge_description += f", '{snapshot.prov_subject}'"
121 merge_description += "."
122 return merge_description
124 def _retrieve_last_snapshot(self, prov_subject: URIRef) -> Optional[URIRef]:
125 last_snapshot_count: str = str(self.counter_handler.read_counter(str(prov_subject)))
126 if int(last_snapshot_count) <= 0:
127 return None
128 else:
129 return URIRef(str(prov_subject) + '/prov/se/' + last_snapshot_count)
131 def _create_snapshot(self, cur_subj: URIRef, cur_time: str) -> SnapshotEntity:
132 new_snapshot: SnapshotEntity = self.add_se(prov_subject=cur_subj)
133 new_snapshot.is_snapshot_of(cur_subj)
134 new_snapshot.has_generation_time(cur_time)
135 source = self.prov_g.entity_index[cur_subj]['source']
136 resp_agent = self.prov_g.entity_index[cur_subj]['resp_agent']
137 if source is not None:
138 new_snapshot.has_primary_source(URIRef(source))
139 if resp_agent is not None:
140 new_snapshot.has_resp_agent(URIRef(resp_agent))
141 return new_snapshot
143 def _get_snapshots_from_merge_list(self, cur_subj_merge_index: dict) -> List[SnapshotEntity]:
144 snapshots_list: List[SnapshotEntity] = []
145 for _, merge_entities in cur_subj_merge_index.items():
146 for merge_entity in merge_entities:
147 last_entity_snapshot_res: Optional[URIRef] = self._retrieve_last_snapshot(merge_entity)
148 if last_entity_snapshot_res is not None: 148 ↛ 146line 148 didn't jump to line 146 because the condition on line 148 was always true
149 snapshots_list.append(self.add_se(prov_subject=merge_entity, res=last_entity_snapshot_res))
150 return snapshots_list
152 def add_se(self, prov_subject: URIRef, res: URIRef = None) -> SnapshotEntity:
153 if res is not None and res in self.res_to_entity: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true
154 return self.res_to_entity[res]
155 count = self._add_prov(str(prov_subject), res)
156 se = SnapshotEntity(str(prov_subject), self, count)
157 return se
159 def _add_prov(self, prov_subject: str, res: URIRef) -> Optional[str]:
160 if res is not None:
161 res_count: int = int(get_prov_count(res))
162 if res_count > self.counter_handler.read_counter(prov_subject): 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 self.counter_handler.set_counter(res_count, prov_subject)
164 return str(res_count)
165 return str(self.counter_handler.increment_counter(prov_subject))
167 def get_entity(self, res: str) -> Optional[ProvEntity]:
168 if res in self.res_to_entity:
169 return self.res_to_entity[res]