Coverage for heritrace/editor.py: 100%

157 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-18 11:10 +0000

1from datetime import datetime 

2import argparse 

3import concurrent.futures 

4import csv 

5import os 

6import traceback 

7from typing import Dict, List, Set 

8 

9from rdflib import Graph, Literal, URIRef 

10from rdflib_ocdm.counter_handler.counter_handler import CounterHandler 

11from rdflib_ocdm.ocdm_graph import OCDMConjunctiveGraph, OCDMGraph 

12from rdflib_ocdm.reader import Reader 

13from rdflib_ocdm.storer import Storer 

14from SPARQLWrapper import SPARQLWrapper, JSON 

15from tqdm import tqdm 

16 

17 

18class Editor: 

19 def __init__( 

20 self, 

21 dataset_endpoint: str, 

22 provenance_endpoint: str, 

23 counter_handler: CounterHandler, 

24 resp_agent: URIRef, 

25 source: URIRef = None, 

26 c_time: datetime | None = None, 

27 dataset_is_quadstore: bool = True, 

28 ): 

29 self.dataset_endpoint = dataset_endpoint 

30 self.provenance_endpoint = provenance_endpoint 

31 self.counter_handler = counter_handler 

32 self.resp_agent = resp_agent 

33 self.source = source 

34 self.c_time = self.to_posix_timestamp(c_time) 

35 self.dataset_is_quadstore = dataset_is_quadstore 

36 self.g_set = ( 

37 OCDMConjunctiveGraph(self.counter_handler) 

38 if self.dataset_is_quadstore 

39 else OCDMGraph(self.counter_handler) 

40 ) 

41 

42 def _normalize_params(self, subject, predicate=None, graph=None) -> tuple[URIRef, URIRef | None, URIRef | Graph | str | None]: 

43 """Normalizza i parametri comuni per le operazioni sui grafi.""" 

44 # Normalizza il soggetto 

45 if not isinstance(subject, URIRef): 

46 subject = URIRef(subject) 

47 

48 # Normalizza il predicato se fornito 

49 if predicate is not None and not isinstance(predicate, URIRef): 

50 predicate = URIRef(predicate) 

51 

52 # Normalizza il grafo se fornito 

53 if graph is not None: 

54 if isinstance(graph, Graph): 

55 graph = graph.identifier 

56 elif isinstance(graph, str): 

57 graph = URIRef(graph) 

58 

59 return subject, predicate, graph 

60 

61 def create( 

62 self, 

63 subject: URIRef, 

64 predicate: URIRef, 

65 value: Literal | URIRef, 

66 graph: URIRef | Graph | str = None, 

67 ) -> None: 

68 # Normalizza i parametri 

69 subject, predicate, graph = self._normalize_params(subject, predicate, graph) 

70 

71 if self.dataset_is_quadstore and graph: 

72 self.g_set.add( 

73 (subject, predicate, value, graph), 

74 resp_agent=self.resp_agent, 

75 primary_source=self.source, 

76 ) 

77 else: 

78 self.g_set.add( 

79 (subject, predicate, value), 

80 resp_agent=self.resp_agent, 

81 primary_source=self.source, 

82 ) 

83 

84 def update( 

85 self, 

86 subject: URIRef, 

87 predicate: URIRef, 

88 old_value: Literal | URIRef, 

89 new_value: Literal | URIRef, 

90 graph: URIRef | Graph | str = None, 

91 ) -> None: 

92 # Normalizza i parametri 

93 subject, predicate, graph = self._normalize_params(subject, predicate, graph) 

94 

95 # Check if the triple exists before updating 

96 if self.dataset_is_quadstore and graph: 

97 if not (subject, predicate, old_value, graph) in self.g_set: 

98 raise Exception( 

99 f"Triple ({subject}, {predicate}, {old_value}, {graph}) does not exist" 

100 ) 

101 self.g_set.remove((subject, predicate, old_value, graph)) 

102 self.g_set.add( 

103 (subject, predicate, new_value, graph), 

104 resp_agent=self.resp_agent, 

105 primary_source=self.source, 

106 ) 

107 else: 

108 if not (subject, predicate, old_value) in self.g_set: 

109 raise Exception( 

110 f"Triple ({subject}, {predicate}, {old_value}) does not exist" 

111 ) 

112 self.g_set.remove((subject, predicate, old_value)) 

113 self.g_set.add( 

114 (subject, predicate, new_value), 

115 resp_agent=self.resp_agent, 

116 primary_source=self.source, 

117 ) 

118 

119 def delete( 

120 self, 

121 subject: URIRef, 

122 predicate: URIRef = None, 

123 value=None, 

124 graph: URIRef | Graph | str = None, 

125 ) -> None: 

126 # Normalizza i parametri 

127 subject, predicate, graph = self._normalize_params(subject, predicate, graph) 

128 

129 if predicate is None: 

130 # Delete the entire entity 

131 # Check if the entity exists 

132 if self.dataset_is_quadstore: 

133 quads = list(self.g_set.quads((subject, None, None, None))) 

134 if not quads: 

135 raise Exception(f"Entity {subject} does not exist") 

136 for quad in quads: 

137 self.g_set.remove(quad) 

138 

139 # Also remove any triples where this entity is the object 

140 object_quads = list(self.g_set.quads((None, None, subject, None))) 

141 for quad in object_quads: 

142 self.g_set.remove(quad) 

143 else: 

144 triples = list(self.g_set.triples((subject, None, None))) 

145 if not triples: 

146 raise Exception(f"Entity {subject} does not exist") 

147 for triple in triples: 

148 self.g_set.remove(triple) 

149 

150 # Also remove any triples where this entity is the object 

151 object_triples = list(self.g_set.triples((None, None, subject))) 

152 for triple in object_triples: 

153 self.g_set.remove(triple) 

154 self.g_set.mark_as_deleted(subject) 

155 else: 

156 if value: 

157 # Check if the specific triple/quad exists before removing it 

158 if self.dataset_is_quadstore and graph: 

159 if not (subject, predicate, value, graph) in self.g_set: 

160 raise Exception( 

161 f"Triple ({subject}, {predicate}, {value}, {graph}) does not exist" 

162 ) 

163 self.g_set.remove((subject, predicate, value, graph)) 

164 else: 

165 if not (subject, predicate, value) in self.g_set: 

166 raise Exception( 

167 f"Triple ({subject}, {predicate}, {value}) does not exist" 

168 ) 

169 self.g_set.remove((subject, predicate, value)) 

170 else: 

171 # Check if any triples with the given subject and predicate exist 

172 if self.dataset_is_quadstore and graph: 

173 quads = list(self.g_set.quads((subject, predicate, None, graph))) 

174 if not quads: 

175 raise Exception( 

176 f"No triples found with subject {subject} and predicate {predicate} in graph {graph}" 

177 ) 

178 for quad in quads: 

179 self.g_set.remove(quad) 

180 else: 

181 triples = list(self.g_set.triples((subject, predicate, None))) 

182 if not triples: 

183 raise Exception( 

184 f"No triples found with subject {subject} and predicate {predicate}" 

185 ) 

186 for triple in triples: 

187 self.g_set.remove(triple) 

188 

189 # Check if the entity is now empty and mark it as deleted if so 

190 if len(list(self.g_set.triples((subject, None, None)))) == 0: 

191 self.g_set.mark_as_deleted(subject) 

192 

193 def import_entity(self, subject): 

194 Reader.import_entities_from_triplestore( 

195 self.g_set, self.dataset_endpoint, [subject] 

196 ) 

197 

198 def merge(self, keep_entity_uri: str, delete_entity_uri: str) -> None: 

199 """ 

200 Merges one entity into another within the dataset. 

201 

202 The delete_entity_uri will be removed, and its properties and 

203 incoming references will be transferred to keep_entity_uri. 

204 All operations are performed within the local graph set managed by 

205 this Editor instance and then saved, ensuring provenance capture. 

206 

207 Args: 

208 keep_entity_uri: The URI of the entity to keep. 

209 delete_entity_uri: The URI of the entity to delete and merge from. 

210 

211 Raises: 

212 ValueError: If keep_entity_uri and delete_entity_uri are the same. 

213 Exception: If errors occur during SPARQL queries or graph operations. 

214 """ 

215 keep_uri, _, _ = self._normalize_params(keep_entity_uri) 

216 delete_uri, _, _ = self._normalize_params(delete_entity_uri) 

217 

218 if keep_uri == delete_uri: 

219 raise ValueError("Cannot merge an entity with itself.") 

220 

221 sparql = SPARQLWrapper(self.dataset_endpoint) 

222 entities_to_import: Set[URIRef] = {keep_uri, delete_uri} 

223 incoming_triples_to_update: List[tuple[URIRef, URIRef]] = [] 

224 outgoing_triples_to_move: List[tuple[URIRef, Literal | URIRef]] = [] 

225 

226 try: 

227 # 1. Find incoming references to delete_uri 

228 # We fetch subjects and predicates pointing to the entity to be deleted. 

229 query_incoming = f"SELECT DISTINCT ?s ?p WHERE {{ ?s ?p <{delete_uri}> . FILTER (?s != <{keep_uri}>) }}" 

230 sparql.setQuery(query_incoming) 

231 sparql.setReturnFormat(JSON) 

232 results_incoming = sparql.query().convert() 

233 for result in results_incoming["results"]["bindings"]: 

234 s_uri = URIRef(result["s"]["value"]) 

235 p_uri = URIRef(result["p"]["value"]) 

236 incoming_triples_to_update.append((s_uri, p_uri)) 

237 entities_to_import.add(s_uri) # Ensure referencing entities are loaded 

238 

239 # 2. Find outgoing properties from delete_uri (excluding rdf:type) 

240 # We fetch predicates and objects of the entity to be deleted. 

241 query_outgoing = f""" 

242 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

243 SELECT DISTINCT ?p ?o WHERE {{ 

244 <{delete_uri}> ?p ?o . 

245 FILTER (?p != rdf:type) 

246 }} 

247 """ 

248 sparql.setQuery(query_outgoing) 

249 sparql.setReturnFormat(JSON) 

250 results_outgoing = sparql.query().convert() 

251 for result in results_outgoing["results"]["bindings"]: 

252 p_uri = URIRef(result["p"]["value"]) 

253 o_node = result["o"] 

254 o_val: Literal | URIRef | None = None 

255 if o_node["type"] == "uri": 

256 o_val = URIRef(o_node["value"]) 

257 entities_to_import.add(o_val) # Ensure referenced entities are loaded 

258 elif o_node["type"] in {"literal", "typed-literal"}: 

259 o_val = Literal(o_node["value"], lang=o_node.get("xml:lang"), datatype=URIRef(o_node["datatype"]) if o_node.get("datatype") else None) 

260 else: # bnode? Skip for now or handle if necessary 

261 print(f"Warning: Skipping non-URI/Literal object type '{o_node['type']}' from {delete_uri} via {p_uri}") 

262 continue 

263 if o_val: 

264 outgoing_triples_to_move.append((p_uri, o_val)) 

265 

266 # 3. Import all involved entities into the local graph set 

267 # This brings the current state of these entities from the triplestore 

268 # into the Editor's context for modification. 

269 if entities_to_import: 

270 Reader.import_entities_from_triplestore( 

271 self.g_set, self.dataset_endpoint, list(entities_to_import) 

272 ) 

273 # Mark the start of modifications if using preexisting_finished pattern 

274 self.g_set.preexisting_finished(self.resp_agent, self.source, self.c_time) 

275 

276 

277 # 4. Perform the merge using the built-in function 

278 # This function handles moving triples and updating the internal 

279 # merge_index and entity_index for provenance generation. 

280 self.g_set.merge(keep_uri, delete_uri) 

281 

282 # 5. Save changes and provenance 

283 # This uploads the modified local graph and the generated provenance graph. 

284 self.save() 

285 

286 except Exception as e: 

287 print(f"Error during merge operation for {keep_uri} and {delete_uri}: {e}") 

288 print(traceback.format_exc()) 

289 # Avoid committing partial changes by not calling save() 

290 raise # Re-raise the exception to signal failure 

291 

292 def preexisting_finished(self): 

293 self.g_set.preexisting_finished(self.resp_agent, self.source, self.c_time) 

294 

295 def save(self): 

296 self.g_set.generate_provenance() 

297 dataset_storer = Storer(self.g_set) 

298 prov_storer = Storer(self.g_set.provenance) 

299 dataset_storer.upload_all(self.dataset_endpoint) 

300 prov_storer.upload_all(self.provenance_endpoint) 

301 self.g_set.commit_changes() 

302 

303 def to_posix_timestamp(self, value: str | datetime | None) -> float | None: 

304 if value is None: 

305 return None 

306 elif isinstance(value, datetime): 

307 return value.timestamp() 

308 elif isinstance(value, str): 

309 dt = datetime.fromisoformat(value) 

310 return dt.timestamp()