Coverage for oc_meta/plugins/editor.py: 67%

232 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19import os 

20import re 

21from time import sleep, time 

22from typing import Set 

23 

24import validators 

25import yaml 

26from oc_ocdm import Storer 

27from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

28from oc_ocdm.graph import GraphSet 

29from oc_ocdm.graph.graph_entity import GraphEntity 

30from oc_ocdm.prov import ProvSet 

31from oc_ocdm.reader import Reader 

32from oc_ocdm.support.support import build_graph_from_results 

33from rdflib import RDF, ConjunctiveGraph, URIRef 

34from SPARQLWrapper import JSON, SPARQLWrapper 

35 

36 

37class EntityCache: 

38 def __init__(self): 

39 self.cache: Set[URIRef] = set() 

40 

41 def add(self, entity: URIRef) -> None: 

42 """Add an entity to the cache""" 

43 self.cache.add(entity) 

44 

45 def is_cached(self, entity: URIRef) -> bool: 

46 """Check if an entity is in the cache""" 

47 return entity in self.cache 

48 

49 def clear(self) -> None: 

50 """Clear all cached entities""" 

51 self.cache.clear() 

52 

53 

54class MetaEditor: 

55 property_to_remove_method = { 

56 "http://purl.org/spar/datacite/hasIdentifier": "remove_identifier", 

57 "http://purl.org/spar/pro/isHeldBy": "remove_is_held_by", 

58 "http://purl.org/vocab/frbr/core#embodiment": "remove_format", 

59 "http://purl.org/spar/pro/isDocumentContextFor": "remove_is_held_by", 

60 "https://w3id.org/oc/ontology/hasNext": "remove_next", 

61 } 

62 

63 def __init__(self, meta_config: str, resp_agent: str, save_queries: bool = False): 

64 with open(meta_config, encoding="utf-8") as file: 

65 settings = yaml.full_load(file) 

66 self.endpoint = settings["triplestore_url"] 

67 self.provenance_endpoint = settings["provenance_triplestore_url"] 

68 output_dir = settings.get("base_output_dir") 

69 self.base_dir = os.path.join(output_dir, "rdf") + os.sep 

70 self.base_iri = settings["base_iri"] 

71 self.resp_agent = resp_agent 

72 self.dir_split = settings["dir_split_number"] 

73 self.n_file_item = settings["items_per_file"] 

74 self.zip_output_rdf = settings["zip_output_rdf"] 

75 self.generate_rdf_files = settings.get("generate_rdf_files", True) 

76 self.reader = Reader() 

77 self.save_queries = save_queries 

78 self.update_queries = [] 

79 

80 # Redis configuration 

81 self.redis_host = settings.get("redis_host", "localhost") 

82 self.redis_port = settings.get("redis_port", 6379) 

83 self.redis_db = settings.get("redis_db", 5) 

84 self.counter_handler = RedisCounterHandler( 

85 host=self.redis_host, port=self.redis_port, db=self.redis_db 

86 ) 

87 

88 self.entity_cache = EntityCache() 

89 self.relationship_cache = {} 

90 

91 def make_sparql_query_with_retry( 

92 self, sparql: SPARQLWrapper, query, max_retries=5, backoff_factor=0.3 

93 ): 

94 sparql.setQuery(query) 

95 sparql.setReturnFormat(JSON) 

96 

97 start_time = time() 

98 for attempt in range(max_retries): 

99 try: 

100 return sparql.queryAndConvert() 

101 except Exception as e: 

102 duration = time() - start_time 

103 if attempt < max_retries - 1: 

104 sleep_time = backoff_factor * (2**attempt) 

105 print(query, duration, f"retry_{attempt + 1}") 

106 sleep(sleep_time) 

107 else: 

108 print(f"SPARQL query failed after {max_retries} attempts: {e}") 

109 raise 

110 

111 def update_property( 

112 self, res: URIRef, property: str, new_value: str | URIRef 

113 ) -> None: 

114 supplier_prefix = self.__get_supplier_prefix(res) 

115 g_set = GraphSet( 

116 self.base_iri, 

117 supplier_prefix=supplier_prefix, 

118 custom_counter_handler=self.counter_handler, 

119 ) 

120 self.reader.import_entity_from_triplestore( 

121 g_set, self.endpoint, res, self.resp_agent, enable_validation=False 

122 ) 

123 if validators.url(new_value): 

124 new_value = URIRef(new_value) 

125 self.reader.import_entity_from_triplestore( 

126 g_set, 

127 self.endpoint, 

128 new_value, 

129 self.resp_agent, 

130 enable_validation=False, 

131 ) 

132 getattr(g_set.get_entity(res), property)(g_set.get_entity(new_value)) 

133 else: 

134 getattr(g_set.get_entity(res), property)(new_value) 

135 self.save(g_set, supplier_prefix) 

136 

137 def delete(self, res: str, property: str = None, object: str = None) -> None: 

138 supplier_prefix = self.__get_supplier_prefix(res) 

139 g_set = GraphSet( 

140 self.base_iri, 

141 supplier_prefix=supplier_prefix, 

142 custom_counter_handler=self.counter_handler, 

143 ) 

144 try: 

145 self.reader.import_entity_from_triplestore( 

146 g_set, self.endpoint, res, self.resp_agent, enable_validation=False 

147 ) 

148 except ValueError as e: 

149 print(f"ValueError for entity {res}: {e}") 

150 inferred_type = self.infer_type_from_uri(res) 

151 if inferred_type: 

152 print(f"Inferred type {inferred_type} for entity {res}") 

153 sparql: SPARQLWrapper = SPARQLWrapper(self.endpoint) 

154 query: str = ( 

155 f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}" 

156 ) 

157 sparql.setQuery(query) 

158 sparql.setMethod("GET") 

159 sparql.setReturnFormat(JSON) 

160 result = sparql.queryAndConvert()["results"]["bindings"] 

161 preexisting_graph: ConjunctiveGraph = build_graph_from_results(result) 

162 self.add_entity_with_type(g_set, res, inferred_type, preexisting_graph) 

163 else: 

164 return 

165 if not g_set.get_entity(URIRef(res)): 

166 return 

167 if property: 

168 remove_method = ( 

169 self.property_to_remove_method[property] 

170 if property in self.property_to_remove_method 

171 else ( 

172 property.replace("has", "remove") 

173 if property.startswith("has") 

174 else f"remove_{property}" 

175 ) 

176 ) 

177 if object: 

178 if validators.url(object): 

179 self.reader.import_entity_from_triplestore( 

180 g_set, 

181 self.endpoint, 

182 object, 

183 self.resp_agent, 

184 enable_validation=False, 

185 ) 

186 # try: 

187 getattr(g_set.get_entity(URIRef(res)), remove_method)( 

188 g_set.get_entity(URIRef(object)) 

189 ) 

190 # TypeError: AgentRole.remove_is_held_by() takes 1 positional argument but 2 were given 

191 # except TypeError: 

192 # getattr(g_set.get_entity(URIRef(res)), remove_method)() 

193 else: 

194 getattr(g_set.get_entity(URIRef(res)), remove_method)(object) 

195 else: 

196 getattr(g_set.get_entity(URIRef(res)), remove_method)() 

197 else: 

198 sparql = SPARQLWrapper(endpoint=self.endpoint) 

199 query = f"SELECT ?s WHERE {{?s ?p <{res}>.}}" 

200 sparql.setQuery(query) 

201 sparql.setReturnFormat(JSON) 

202 result = sparql.queryAndConvert() 

203 for entity in result["results"]["bindings"]: 

204 self.reader.import_entity_from_triplestore( 

205 g_set, 

206 self.endpoint, 

207 URIRef(entity["s"]["value"]), 

208 self.resp_agent, 

209 enable_validation=False, 

210 ) 

211 entity_to_purge = g_set.get_entity(URIRef(res)) 

212 entity_to_purge.mark_as_to_be_deleted() 

213 self.save(g_set, supplier_prefix) 

214 

215 def merge(self, g_set: GraphSet, res: URIRef, other: URIRef) -> None: 

216 """ 

217 Merge two entities and their related entities using batch import with caching. 

218 

219 Args: 

220 g_set: The GraphSet containing the entities 

221 res: The main entity that will absorb the other 

222 other: The entity to be merged into the main one 

223 """ 

224 # First get all related entities with a single SPARQL query 

225 related_entities = set() 

226 if other in self.relationship_cache: 

227 related_entities.update(self.relationship_cache[other]) 

228 else: 

229 sparql = SPARQLWrapper(endpoint=self.endpoint) 

230 query = f""" 

231 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

232 PREFIX datacite: <http://purl.org/spar/datacite/> 

233 PREFIX pro: <http://purl.org/spar/pro/> 

234 SELECT DISTINCT ?entity WHERE {{ 

235 {{?entity ?p <{other}>}} UNION  

236 {{<{other}> ?p ?entity}} 

237 FILTER (?p != rdf:type)  

238 FILTER (?p != datacite:usesIdentifierScheme)  

239 FILTER (?p != pro:withRole) 

240 }}""" 

241 

242 data = self.make_sparql_query_with_retry(sparql, query) 

243 other_related = { 

244 URIRef(result["entity"]["value"]) 

245 for result in data["results"]["bindings"] 

246 if result["entity"]["type"] == "uri" 

247 } 

248 

249 self.relationship_cache[other] = other_related 

250 related_entities.update(other_related) 

251 if res in self.relationship_cache: 

252 related_entities.update(self.relationship_cache[res]) 

253 else: 

254 # Query only for objects of the surviving entity if not in cache 

255 sparql = SPARQLWrapper(endpoint=self.endpoint) 

256 query = f""" 

257 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

258 PREFIX datacite: <http://purl.org/spar/datacite/> 

259 PREFIX pro: <http://purl.org/spar/pro/> 

260 SELECT DISTINCT ?entity WHERE {{ 

261 <{res}> ?p ?entity 

262 FILTER (?p != rdf:type)  

263 FILTER (?p != datacite:usesIdentifierScheme)  

264 FILTER (?p != pro:withRole) 

265 }}""" 

266 

267 data = self.make_sparql_query_with_retry(sparql, query) 

268 res_related = { 

269 URIRef(result["entity"]["value"]) 

270 for result in data["results"]["bindings"] 

271 if result["entity"]["type"] == "uri" 

272 } 

273 

274 self.relationship_cache[res] = res_related 

275 related_entities.update(res_related) 

276 

277 entities_to_import = set([res, other]) 

278 entities_to_import.update(related_entities) 

279 entities_to_import = { 

280 e for e in entities_to_import if not self.entity_cache.is_cached(e) 

281 } 

282 # Import only non-cached entities if there are any 

283 if entities_to_import: 

284 try: 

285 self.reader.import_entities_from_triplestore( 

286 g_set=g_set, 

287 ts_url=self.endpoint, 

288 entities=list(entities_to_import), 

289 resp_agent=self.resp_agent, 

290 enable_validation=False, 

291 batch_size=10, 

292 ) 

293 

294 # Add newly imported entities to cache 

295 for entity in entities_to_import: 

296 self.entity_cache.add(entity) 

297 

298 except ValueError as e: 

299 print(f"Error importing entities: {e}") 

300 return 

301 

302 # Perform the merge 

303 res_as_entity = g_set.get_entity(res) 

304 other_as_entity = g_set.get_entity(other) 

305 

306 is_both_expression = all( 

307 GraphEntity.iri_expression in entity.g.objects(entity.res, RDF.type) 

308 for entity in [res_as_entity, other_as_entity] 

309 ) 

310 

311 if is_both_expression: 

312 res_as_entity.merge(other_as_entity, prefer_self=True) 

313 else: 

314 res_as_entity.merge(other_as_entity) 

315 

316 def sync_rdf_with_triplestore(self, res: str, source_uri: str = None) -> bool: 

317 supplier_prefix = self.__get_supplier_prefix(res) 

318 g_set = GraphSet( 

319 self.base_iri, 

320 supplier_prefix=supplier_prefix, 

321 custom_counter_handler=self.counter_handler, 

322 ) 

323 try: 

324 self.reader.import_entity_from_triplestore( 

325 g_set, self.endpoint, res, self.resp_agent, enable_validation=False 

326 ) 

327 self.save(g_set, supplier_prefix) 

328 return True 

329 except ValueError: 

330 try: 

331 self.reader.import_entity_from_triplestore( 

332 g_set, 

333 self.endpoint, 

334 source_uri, 

335 self.resp_agent, 

336 enable_validation=False, 

337 ) 

338 except ValueError: 

339 res_filepath = self.find_file( 

340 self.base_dir, 

341 self.dir_split, 

342 self.n_file_item, 

343 source_uri, 

344 self.zip_output_rdf, 

345 ) 

346 if not res_filepath: 

347 return False 

348 imported_graph = self.reader.load(res_filepath) 

349 self.reader.import_entities_from_graph( 

350 g_set, imported_graph, self.resp_agent 

351 ) 

352 res_entity = g_set.get_entity(URIRef(source_uri)) 

353 if res_entity: 

354 for res, entity in g_set.res_to_entity.items(): 

355 triples_list = list( 

356 entity.g.triples((URIRef(source_uri), None, None)) 

357 ) 

358 for triple in triples_list: 

359 entity.g.remove(triple) 

360 self.save(g_set, supplier_prefix) 

361 return False 

362 

363 def save(self, g_set: GraphSet, supplier_prefix: str = "") -> None: 

364 provset = ProvSet( 

365 g_set, 

366 self.base_iri, 

367 wanted_label=False, 

368 supplier_prefix=supplier_prefix, 

369 custom_counter_handler=self.counter_handler, 

370 ) 

371 provset.generate_provenance() 

372 graph_storer = Storer( 

373 g_set, 

374 dir_split=self.dir_split, 

375 n_file_item=self.n_file_item, 

376 zip_output=self.zip_output_rdf, 

377 ) 

378 prov_storer = Storer( 

379 provset, 

380 dir_split=self.dir_split, 

381 n_file_item=self.n_file_item, 

382 zip_output=self.zip_output_rdf, 

383 ) 

384 

385 if self.generate_rdf_files: 

386 graph_storer.store_all(self.base_dir, self.base_iri) 

387 prov_storer.store_all(self.base_dir, self.base_iri) 

388 

389 graph_storer.upload_all( 

390 self.endpoint, base_dir=self.base_dir, save_queries=self.save_queries 

391 ) 

392 prov_storer.upload_all( 

393 self.provenance_endpoint, base_dir=self.base_dir, save_queries=self.save_queries 

394 ) 

395 g_set.commit_changes() 

396 

397 def __get_supplier_prefix(self, uri: str) -> str: 

398 entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?([1-9][0-9]*)$" 

399 entity_match = re.match(entity_regex, uri) 

400 return entity_match.group(3) 

401 

402 def find_file( 

403 self, 

404 rdf_dir: str, 

405 dir_split_number: str, 

406 items_per_file: str, 

407 uri: str, 

408 zip_output_rdf: bool, 

409 ) -> str | None: 

410 entity_regex: str = ( 

411 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$" 

412 ) 

413 entity_match = re.match(entity_regex, uri) 

414 if entity_match: 

415 cur_number = int(entity_match.group(4)) 

416 cur_file_split: int = 0 

417 while True: 

418 if cur_number > cur_file_split: 

419 cur_file_split += items_per_file 

420 else: 

421 break 

422 cur_split: int = 0 

423 while True: 

424 if cur_number > cur_split: 

425 cur_split += dir_split_number 

426 else: 

427 break 

428 short_name = entity_match.group(2) 

429 sub_folder = entity_match.group(3) 

430 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split)) 

431 extension = ".zip" if zip_output_rdf else ".json" 

432 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + extension 

433 return cur_file_path 

434 

435 def infer_type_from_uri(self, uri: str) -> str: 

436 if os.path.join(self.base_iri, "br") in uri: 

437 return GraphEntity.iri_expression 

438 elif os.path.join(self.base_iri, "ar") in uri: 

439 return GraphEntity.iri_role_in_time 

440 elif os.path.join(self.base_iri, "ra") in uri: 

441 return GraphEntity.iri_agent 

442 elif os.path.join(self.base_iri, "re") in uri: 

443 return GraphEntity.iri_manifestation 

444 elif os.path.join(self.base_iri, "id") in uri: 

445 return GraphEntity.iri_identifier 

446 return None 

447 

448 def add_entity_with_type( 

449 self, 

450 g_set: GraphSet, 

451 res: str, 

452 entity_type: str, 

453 preexisting_graph: ConjunctiveGraph, 

454 ): 

455 subject = URIRef(res) 

456 if entity_type == GraphEntity.iri_expression: 

457 g_set.add_br( 

458 resp_agent=self.resp_agent, 

459 res=subject, 

460 preexisting_graph=preexisting_graph, 

461 ) 

462 elif entity_type == GraphEntity.iri_role_in_time: 

463 g_set.add_ar( 

464 resp_agent=self.resp_agent, 

465 res=subject, 

466 preexisting_graph=preexisting_graph, 

467 ) 

468 elif entity_type == GraphEntity.iri_agent: 

469 g_set.add_ra( 

470 resp_agent=self.resp_agent, 

471 res=subject, 

472 preexisting_graph=preexisting_graph, 

473 ) 

474 elif entity_type == GraphEntity.iri_manifestation: 

475 g_set.add_re( 

476 resp_agent=self.resp_agent, 

477 res=subject, 

478 preexisting_graph=preexisting_graph, 

479 ) 

480 elif entity_type == GraphEntity.iri_identifier: 

481 g_set.add_id( 

482 resp_agent=self.resp_agent, 

483 res=subject, 

484 preexisting_graph=preexisting_graph, 

485 )