Coverage for heritrace/editor.py: 100%

158 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-06-24 11:39 +0000

1import traceback 

2from datetime import datetime 

3from typing import List, Set 

4 

5from heritrace.extensions import SPARQLWrapperWithRetry 

6from rdflib import Graph, Literal, URIRef 

7from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

8from rdflib_ocdm.ocdm_graph import OCDMConjunctiveGraph, OCDMGraph 

9from rdflib_ocdm.reader import Reader 

10from rdflib_ocdm.storer import Storer 

11from SPARQLWrapper import JSON 

12 

13 

14class Editor: 

15 def __init__( 

16 self, 

17 dataset_endpoint: str, 

18 provenance_endpoint: str, 

19 counter_handler: CounterHandler, 

20 resp_agent: URIRef, 

21 source: URIRef = None, 

22 c_time: datetime | None = None, 

23 dataset_is_quadstore: bool = True, 

24 ): 

25 self.dataset_endpoint = dataset_endpoint 

26 self.provenance_endpoint = provenance_endpoint 

27 self.counter_handler = counter_handler 

28 self.resp_agent = resp_agent 

29 self.source = source 

30 self.c_time = self.to_posix_timestamp(c_time) 

31 self.dataset_is_quadstore = dataset_is_quadstore 

32 self.g_set = ( 

33 OCDMConjunctiveGraph(self.counter_handler) 

34 if self.dataset_is_quadstore 

35 else OCDMGraph(self.counter_handler) 

36 ) 

37 

38 def _normalize_params(self, subject, predicate=None, graph=None) -> tuple[URIRef, URIRef | None, URIRef | Graph | str | None]: 

39 """Normalizza i parametri comuni per le operazioni sui grafi.""" 

40 # Normalizza il soggetto 

41 if not isinstance(subject, URIRef): 

42 subject = URIRef(subject) 

43 

44 # Normalizza il predicato se fornito 

45 if predicate is not None and not isinstance(predicate, URIRef): 

46 predicate = URIRef(predicate) 

47 

48 # Normalizza il grafo se fornito 

49 if graph is not None: 

50 if isinstance(graph, Graph): 

51 graph = graph.identifier 

52 elif isinstance(graph, str): 

53 graph = URIRef(graph) 

54 

55 return subject, predicate, graph 

56 

57 def create( 

58 self, 

59 subject: URIRef, 

60 predicate: URIRef, 

61 value: Literal | URIRef, 

62 graph: URIRef | Graph | str = None, 

63 ) -> None: 

64 # Normalizza i parametri 

65 subject, predicate, graph = self._normalize_params(subject, predicate, graph) 

66 

67 if self.dataset_is_quadstore and graph: 

68 self.g_set.add( 

69 (subject, predicate, value, graph), 

70 resp_agent=self.resp_agent, 

71 primary_source=self.source, 

72 ) 

73 else: 

74 self.g_set.add( 

75 (subject, predicate, value), 

76 resp_agent=self.resp_agent, 

77 primary_source=self.source, 

78 ) 

79 

80 def update( 

81 self, 

82 subject: URIRef, 

83 predicate: URIRef, 

84 old_value: Literal | URIRef, 

85 new_value: Literal | URIRef, 

86 graph: URIRef | Graph | str = None, 

87 ) -> None: 

88 # Normalizza i parametri 

89 subject, predicate, graph = self._normalize_params(subject, predicate, graph) 

90 

91 # Check if the triple exists before updating 

92 if self.dataset_is_quadstore and graph: 

93 if not (subject, predicate, old_value, graph) in self.g_set: 

94 raise Exception( 

95 f"Triple ({subject}, {predicate}, {old_value}, {graph}) does not exist" 

96 ) 

97 self.g_set.remove((subject, predicate, old_value, graph)) 

98 self.g_set.add( 

99 (subject, predicate, new_value, graph), 

100 resp_agent=self.resp_agent, 

101 primary_source=self.source, 

102 ) 

103 else: 

104 if not (subject, predicate, old_value) in self.g_set: 

105 raise Exception( 

106 f"Triple ({subject}, {predicate}, {old_value}) does not exist" 

107 ) 

108 self.g_set.remove((subject, predicate, old_value)) 

109 self.g_set.add( 

110 (subject, predicate, new_value), 

111 resp_agent=self.resp_agent, 

112 primary_source=self.source, 

113 ) 

114 

115 def delete( 

116 self, 

117 subject: URIRef, 

118 predicate: URIRef = None, 

119 value=None, 

120 graph: URIRef | Graph | str = None, 

121 ) -> None: 

122 # Normalizza i parametri 

123 subject, predicate, graph = self._normalize_params(subject, predicate, graph) 

124 

125 if predicate is None: 

126 # Delete the entire entity 

127 # Check if the entity exists 

128 if self.dataset_is_quadstore: 

129 quads = list(self.g_set.quads((subject, None, None, None))) 

130 if not quads: 

131 raise Exception(f"Entity {subject} does not exist") 

132 for quad in quads: 

133 self.g_set.remove(quad) 

134 

135 # Also remove any triples where this entity is the object 

136 object_quads = list(self.g_set.quads((None, None, subject, None))) 

137 for quad in object_quads: 

138 self.g_set.remove(quad) 

139 else: 

140 triples = list(self.g_set.triples((subject, None, None))) 

141 if not triples: 

142 raise Exception(f"Entity {subject} does not exist") 

143 for triple in triples: 

144 self.g_set.remove(triple) 

145 

146 # Also remove any triples where this entity is the object 

147 object_triples = list(self.g_set.triples((None, None, subject))) 

148 for triple in object_triples: 

149 self.g_set.remove(triple) 

150 self.g_set.mark_as_deleted(subject) 

151 else: 

152 if value: 

153 # Check if the specific triple/quad exists before removing it 

154 if self.dataset_is_quadstore and graph: 

155 if not (subject, predicate, value, graph) in self.g_set: 

156 raise Exception( 

157 f"Triple ({subject}, {predicate}, {value}, {graph}) does not exist" 

158 ) 

159 self.g_set.remove((subject, predicate, value, graph)) 

160 else: 

161 if not (subject, predicate, value) in self.g_set: 

162 raise Exception( 

163 f"Triple ({subject}, {predicate}, {value}) does not exist" 

164 ) 

165 self.g_set.remove((subject, predicate, value)) 

166 else: 

167 # Check if any triples with the given subject and predicate exist 

168 if self.dataset_is_quadstore and graph: 

169 quads = list(self.g_set.quads((subject, predicate, None, graph))) 

170 if not quads: 

171 raise Exception( 

172 f"No triples found with subject {subject} and predicate {predicate} in graph {graph}" 

173 ) 

174 for quad in quads: 

175 self.g_set.remove(quad) 

176 else: 

177 triples = list(self.g_set.triples((subject, predicate, None))) 

178 if not triples: 

179 raise Exception( 

180 f"No triples found with subject {subject} and predicate {predicate}" 

181 ) 

182 for triple in triples: 

183 self.g_set.remove(triple) 

184 

185 # Check if the entity is now empty and mark it as deleted if so 

186 if len(list(self.g_set.triples((subject, None, None)))) == 0: 

187 self.g_set.mark_as_deleted(subject) 

188 

189 def import_entity(self, subject): 

190 Reader.import_entities_from_triplestore( 

191 self.g_set, self.dataset_endpoint, [subject] 

192 ) 

193 

194 def merge(self, keep_entity_uri: str, delete_entity_uri: str) -> None: 

195 """ 

196 Merges one entity into another within the dataset. 

197 

198 The delete_entity_uri will be removed, and its properties and 

199 incoming references will be transferred to keep_entity_uri. 

200 All operations are performed within the local graph set managed by 

201 this Editor instance and then saved, ensuring provenance capture. 

202 

203 Args: 

204 keep_entity_uri: The URI of the entity to keep. 

205 delete_entity_uri: The URI of the entity to delete and merge from. 

206 

207 Raises: 

208 ValueError: If keep_entity_uri and delete_entity_uri are the same. 

209 Exception: If errors occur during SPARQL queries or graph operations. 

210 """ 

211 keep_uri, _, _ = self._normalize_params(keep_entity_uri) 

212 delete_uri, _, _ = self._normalize_params(delete_entity_uri) 

213 

214 if keep_uri == delete_uri: 

215 raise ValueError("Cannot merge an entity with itself.") 

216 

217 sparql = SPARQLWrapperWithRetry(self.dataset_endpoint) 

218 entities_to_import: Set[URIRef] = {keep_uri, delete_uri} 

219 incoming_triples_to_update: List[tuple[URIRef, URIRef]] = [] 

220 outgoing_triples_to_move: List[tuple[URIRef, Literal | URIRef]] = [] 

221 

222 try: 

223 # 1. Find incoming references to delete_uri 

224 # We fetch subjects and predicates pointing to the entity to be deleted. 

225 query_incoming = f"SELECT DISTINCT ?s ?p WHERE {{ ?s ?p <{delete_uri}> . FILTER (?s != <{keep_uri}>) }}" 

226 sparql.setQuery(query_incoming) 

227 sparql.setReturnFormat(JSON) 

228 results_incoming = sparql.query().convert() 

229 for result in results_incoming["results"]["bindings"]: 

230 s_uri = URIRef(result["s"]["value"]) 

231 p_uri = URIRef(result["p"]["value"]) 

232 incoming_triples_to_update.append((s_uri, p_uri)) 

233 entities_to_import.add(s_uri) # Ensure referencing entities are loaded 

234 

235 # 2. Find outgoing properties from delete_uri (excluding rdf:type) 

236 # We fetch predicates and objects of the entity to be deleted. 

237 query_outgoing = f""" 

238 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

239 SELECT DISTINCT ?p ?o WHERE {{ 

240 <{delete_uri}> ?p ?o . 

241 FILTER (?p != rdf:type) 

242 }} 

243 """ 

244 sparql.setQuery(query_outgoing) 

245 sparql.setReturnFormat(JSON) 

246 results_outgoing = sparql.query().convert() 

247 for result in results_outgoing["results"]["bindings"]: 

248 p_uri = URIRef(result["p"]["value"]) 

249 o_node = result["o"] 

250 o_val: Literal | URIRef | None = None 

251 if o_node["type"] == "uri": 

252 o_val = URIRef(o_node["value"]) 

253 entities_to_import.add(o_val) # Ensure referenced entities are loaded 

254 elif o_node["type"] in {"literal", "typed-literal"}: 

255 o_val = Literal(o_node["value"], lang=o_node.get("xml:lang"), datatype=URIRef(o_node["datatype"]) if o_node.get("datatype") else None) 

256 else: # bnode? Skip for now or handle if necessary 

257 print(f"Warning: Skipping non-URI/Literal object type '{o_node['type']}' from {delete_uri} via {p_uri}") 

258 continue 

259 if o_val: 

260 outgoing_triples_to_move.append((p_uri, o_val)) 

261 

262 # 3. Import all involved entities into the local graph set 

263 # This brings the current state of these entities from the triplestore 

264 # into the Editor's context for modification. 

265 if entities_to_import: 

266 Reader.import_entities_from_triplestore( 

267 self.g_set, self.dataset_endpoint, list(entities_to_import) 

268 ) 

269 # Mark the start of modifications if using preexisting_finished pattern 

270 self.g_set.preexisting_finished(self.resp_agent, self.source, self.c_time) 

271 

272 

273 # 4. Perform the merge using the built-in function 

274 # This function handles moving triples and updating the internal 

275 # merge_index and entity_index for provenance generation. 

276 self.g_set.merge(keep_uri, delete_uri) 

277 

278 # 5. Save changes and provenance 

279 # This uploads the modified local graph and the generated provenance graph. 

280 self.save() 

281 

282 except Exception as e: 

283 print(f"Error during merge operation for {keep_uri} and {delete_uri}: {e}") 

284 print(traceback.format_exc()) 

285 # Avoid committing partial changes by not calling save() 

286 raise # Re-raise the exception to signal failure 

287 

288 def preexisting_finished(self): 

289 self.g_set.preexisting_finished(self.resp_agent, self.source, self.c_time) 

290 

291 def save(self): 

292 self.g_set.generate_provenance() 

293 dataset_storer = Storer(self.g_set) 

294 prov_storer = Storer(self.g_set.provenance) 

295 dataset_storer.upload_all(self.dataset_endpoint) 

296 prov_storer.upload_all(self.provenance_endpoint) 

297 self.g_set.commit_changes() 

298 

299 def to_posix_timestamp(self, value: str | datetime | None) -> float | None: 

300 if value is None: 

301 return None 

302 elif isinstance(value, datetime): 

303 return value.timestamp() 

304 elif isinstance(value, str): 

305 dt = datetime.fromisoformat(value) 

306 return dt.timestamp() 

307 

308 def set_primary_source(self, source: str | URIRef) -> None: 

309 """ 

310 Set the primary source for this editor instance. 

311  

312 This will affect all future operations performed by this editor. 

313  

314 Args: 

315 source: The primary source URI to use 

316 """ 

317 if source: 

318 if not isinstance(source, URIRef): 

319 source = URIRef(source) 

320 self.source = source