Coverage for heritrace / editor.py: 99%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from dataclasses import dataclass 

6from datetime import datetime, timezone 

7 

8from flask import current_app 

9from rdflib import Literal, URIRef 

10from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

11from rdflib_ocdm.ocdm_graph import OCDMDataset, OCDMGraph 

12from rdflib_ocdm.reader import Reader 

13from rdflib_ocdm.storer import Storer 

14from SPARQLWrapper import JSON 

15 

16from heritrace.sparql import SPARQLWrapperWithRetry, get_sparql_bindings 

17 

18 

19@dataclass(frozen=True, slots=True) 

20class EndpointConfig: 

21 dataset: str 

22 provenance: str 

23 is_quadstore: bool = True 

24 

25 

26class EditorError(Exception): 

27 pass 

28 

29 

30class Editor: 

31 def __init__( 

32 self, 

33 endpoints: EndpointConfig, 

34 counter_handler: CounterHandler, 

35 resp_agent: URIRef, 

36 source: URIRef | None = None, 

37 c_time: datetime | None = None, 

38 ) -> None: 

39 self.dataset_endpoint = endpoints.dataset 

40 self.provenance_endpoint = endpoints.provenance 

41 self.counter_handler = counter_handler 

42 self.resp_agent = resp_agent 

43 self.source = source 

44 self.c_time = self.to_posix_timestamp(c_time) 

45 self.dataset_is_quadstore = endpoints.is_quadstore 

46 self.g_set = ( 

47 OCDMDataset(self.counter_handler) 

48 if self.dataset_is_quadstore 

49 else OCDMGraph(self.counter_handler) 

50 ) 

51 

52 def create( 

53 self, 

54 subject: URIRef, 

55 predicate: URIRef, 

56 value: Literal | URIRef, 

57 graph: URIRef | None = None, 

58 ) -> None: 

59 if self.dataset_is_quadstore and graph: 

60 self.g_set.add( # type: ignore[arg-type] 

61 (subject, predicate, value, graph), # type: ignore[arg-type] 

62 resp_agent=self.resp_agent, 

63 primary_source=self.source, 

64 ) 

65 else: 

66 self.g_set.add( # type: ignore[arg-type] 

67 (subject, predicate, value), 

68 resp_agent=self.resp_agent, 

69 primary_source=self.source, 

70 ) 

71 

72 def update( 

73 self, 

74 subject: URIRef, 

75 predicate: URIRef, 

76 old_value: Literal | URIRef, 

77 new_value: Literal | URIRef, 

78 graph: URIRef | None = None, 

79 ) -> None: 

80 if self.dataset_is_quadstore and graph: 

81 if (subject, predicate, old_value, graph) not in self.g_set: # type: ignore[operator] 

82 msg = ( 

83 f"Triple ({subject}, {predicate}," 

84 f" {old_value}, {graph}) does not exist" 

85 ) 

86 raise EditorError(msg) 

87 self.g_set.remove((subject, predicate, old_value, graph)) # type: ignore[arg-type] 

88 self.g_set.add( # type: ignore[arg-type] 

89 (subject, predicate, new_value, graph), # type: ignore[arg-type] 

90 resp_agent=self.resp_agent, 

91 primary_source=self.source, 

92 ) 

93 else: 

94 if (subject, predicate, old_value) not in self.g_set: # type: ignore[operator] 

95 msg = f"Triple ({subject}, {predicate}, {old_value}) does not exist" 

96 raise EditorError(msg) 

97 self.g_set.remove((subject, predicate, old_value)) # type: ignore[arg-type] 

98 self.g_set.add( # type: ignore[arg-type] 

99 (subject, predicate, new_value), 

100 resp_agent=self.resp_agent, 

101 primary_source=self.source, 

102 ) 

103 

104 def _delete_full_entity(self, subject: URIRef) -> None: 

105 if self.dataset_is_quadstore: 

106 quads = list(self.g_set.quads((subject, None, None, None))) # type: ignore[arg-type] 

107 if not quads: 

108 msg = f"Entity {subject} does not exist" 

109 raise EditorError(msg) 

110 for quad in quads: 

111 self.g_set.remove(quad) # type: ignore[arg-type] 

112 

113 object_quads = list(self.g_set.quads((None, None, subject, None))) # type: ignore[arg-type] 

114 for quad in object_quads: 

115 self.g_set.remove(quad) # type: ignore[arg-type] 

116 else: 

117 triples = list(self.g_set.triples((subject, None, None))) # type: ignore[arg-type] 

118 if not triples: 

119 msg = f"Entity {subject} does not exist" 

120 raise EditorError(msg) 

121 for triple in triples: 

122 self.g_set.remove(triple) # type: ignore[arg-type] 

123 

124 object_triples = list(self.g_set.triples((None, None, subject))) # type: ignore[arg-type] 

125 for triple in object_triples: 

126 self.g_set.remove(triple) # type: ignore[arg-type] 

127 self.g_set.mark_as_deleted(subject) # type: ignore[arg-type] 

128 

129 def _delete_specific_triple( 

130 self, 

131 subject: URIRef, 

132 predicate: URIRef, 

133 value: Literal | URIRef, 

134 graph: URIRef | None, 

135 ) -> None: 

136 if self.dataset_is_quadstore and graph: 

137 if (subject, predicate, value, graph) not in self.g_set: # type: ignore[operator] 

138 msg = ( 

139 f"Triple ({subject}, {predicate}, {value}, {graph}) does not exist" 

140 ) 

141 raise EditorError(msg) 

142 self.g_set.remove((subject, predicate, value, graph)) # type: ignore[arg-type] 

143 else: 

144 if (subject, predicate, value) not in self.g_set: # type: ignore[operator] 

145 msg = f"Triple ({subject}, {predicate}, {value}) does not exist" 

146 raise EditorError(msg) 

147 self.g_set.remove((subject, predicate, value)) # type: ignore[arg-type] 

148 

149 def _delete_all_for_predicate( 

150 self, 

151 subject: URIRef, 

152 predicate: URIRef, 

153 graph: URIRef | None, 

154 ) -> None: 

155 if self.dataset_is_quadstore and graph: 

156 quads = list(self.g_set.quads((subject, predicate, None, graph))) # type: ignore[arg-type] 

157 if not quads: 

158 msg = ( 

159 f"No triples found with subject" 

160 f" {subject} and predicate" 

161 f" {predicate} in graph {graph}" 

162 ) 

163 raise EditorError(msg) 

164 for quad in quads: 

165 self.g_set.remove(quad) # type: ignore[arg-type] 

166 else: 

167 triples = list(self.g_set.triples((subject, predicate, None))) # type: ignore[arg-type] 

168 if not triples: 

169 msg = ( 

170 f"No triples found with subject {subject} and predicate {predicate}" 

171 ) 

172 raise EditorError(msg) 

173 for triple in triples: 

174 self.g_set.remove(triple) # type: ignore[arg-type] 

175 

176 def delete( 

177 self, 

178 subject: URIRef, 

179 predicate: URIRef | None = None, 

180 value: Literal | URIRef | None = None, 

181 graph: URIRef | None = None, 

182 ) -> None: 

183 if predicate is None: 

184 self._delete_full_entity(subject) 

185 elif value: 

186 self._delete_specific_triple(subject, predicate, value, graph) 

187 else: 

188 self._delete_all_for_predicate(subject, predicate, graph) 

189 

190 from heritrace.utils.sparql_utils import get_triples_from_graph # noqa: PLC0415 

191 

192 if len(list(get_triples_from_graph(self.g_set, (subject, None, None)))) == 0: 

193 self.g_set.mark_as_deleted(subject) # type: ignore[arg-type] 

194 

195 def import_entity(self, subject: URIRef) -> None: 

196 Reader.import_entities_from_triplestore( 

197 self.g_set, 

198 self.dataset_endpoint, 

199 [subject], # type: ignore[arg-type] 

200 ) 

201 

202 def merge(self, keep_entity_uri: URIRef, delete_entity_uri: URIRef) -> None: 

203 if keep_entity_uri == delete_entity_uri: 

204 msg = "Cannot merge an entity with itself." 

205 raise ValueError(msg) 

206 

207 merge_sparql = SPARQLWrapperWithRetry(self.dataset_endpoint) 

208 entities_to_import: set[URIRef] = {keep_entity_uri, delete_entity_uri} 

209 incoming_triples_to_update: list[tuple[URIRef, URIRef]] = [] 

210 outgoing_triples_to_move: list[tuple[URIRef, Literal | URIRef]] = [] 

211 

212 query_incoming = ( 

213 "SELECT DISTINCT ?s ?p WHERE {" 

214 f" ?s ?p <{delete_entity_uri}> ." 

215 f" FILTER (?s != <{keep_entity_uri}>) }}" 

216 ) 

217 merge_sparql.setQuery(query_incoming) 

218 merge_sparql.setReturnFormat(JSON) 

219 for binding in get_sparql_bindings(merge_sparql.query().convert()): 

220 s_uri = URIRef(binding["s"]["value"]) 

221 p_uri = URIRef(binding["p"]["value"]) 

222 incoming_triples_to_update.append((s_uri, p_uri)) 

223 entities_to_import.add(s_uri) 

224 

225 query_outgoing = f""" 

226 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

227 SELECT DISTINCT ?p ?o WHERE {{ 

228 <{delete_entity_uri}> ?p ?o . 

229 FILTER (?p != rdf:type) 

230 }} 

231 """ 

232 merge_sparql.setQuery(query_outgoing) 

233 merge_sparql.setReturnFormat(JSON) 

234 for binding in get_sparql_bindings(merge_sparql.query().convert()): 

235 p_uri = URIRef(binding["p"]["value"]) 

236 o_node = binding["o"] 

237 o_val: Literal | URIRef | None = None 

238 if o_node["type"] == "uri": 

239 o_val = URIRef(o_node["value"]) 

240 entities_to_import.add(o_val) 

241 elif o_node["type"] in {"literal", "typed-literal"}: 

242 o_val = Literal( 

243 o_node["value"], 

244 lang=o_node.get("xml:lang"), 

245 datatype=URIRef(o_node["datatype"]) 

246 if o_node.get("datatype") 

247 else None, 

248 ) 

249 else: 

250 current_app.logger.warning( 

251 "Skipping non-URI/Literal object type '%s' from %s via %s", 

252 o_node["type"], 

253 delete_entity_uri, 

254 p_uri, 

255 ) 

256 continue 

257 if o_val: 

258 outgoing_triples_to_move.append((p_uri, o_val)) 

259 

260 if entities_to_import: 

261 Reader.import_entities_from_triplestore( 

262 self.g_set, 

263 self.dataset_endpoint, 

264 list(entities_to_import), # type: ignore[arg-type] 

265 ) 

266 self.g_set.preexisting_finished(self.resp_agent, self.source, self.c_time) # type: ignore[arg-type] 

267 

268 self.g_set.merge(keep_entity_uri, delete_entity_uri) # type: ignore[arg-type] 

269 

270 self.save() 

271 

272 def preexisting_finished(self) -> None: 

273 self.g_set.preexisting_finished(self.resp_agent, self.source, self.c_time) # type: ignore[arg-type] 

274 

275 def save(self) -> None: 

276 self.g_set.generate_provenance() # type: ignore[arg-type] 

277 dataset_storer = Storer(self.g_set) # type: ignore[arg-type] 

278 prov_storer = Storer(self.g_set.provenance) # type: ignore[attr-defined] 

279 dataset_storer.upload_all(self.dataset_endpoint) # type: ignore[arg-type] 

280 prov_storer.upload_all(self.provenance_endpoint) # type: ignore[arg-type] 

281 self.g_set.commit_changes() # type: ignore[arg-type] 

282 

283 def to_posix_timestamp(self, value: str | datetime | None) -> float | None: 

284 if value is None: 

285 return None 

286 if isinstance(value, datetime): 

287 return value.timestamp() 

288 if isinstance(value, str): 

289 dt = datetime.fromisoformat(value) 

290 if dt.tzinfo is None: 

291 dt = dt.replace(tzinfo=timezone.utc) 

292 return dt.timestamp() 

293 return None 

294 

295 def set_primary_source(self, source: URIRef) -> None: 

296 self.source = source