Coverage for oc_meta/plugins/editor.py: 67%

212 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-20 08:55 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19import os 

20import re 

21from typing import Set 

22 

23import validators 

24import yaml 

25from oc_ocdm import Storer 

26from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

27from oc_ocdm.graph import GraphSet 

28from oc_ocdm.graph.graph_entity import GraphEntity 

29from oc_ocdm.prov import ProvSet 

30from oc_ocdm.reader import Reader 

31from oc_ocdm.support.support import build_graph_from_results 

32from rdflib import RDF, Graph, URIRef 

33from sparqlite import SPARQLClient 

34 

35 

36class EntityCache: 

37 def __init__(self): 

38 self.cache: Set[URIRef] = set() 

39 

40 def add(self, entity: URIRef) -> None: 

41 """Add an entity to the cache""" 

42 self.cache.add(entity) 

43 

44 def is_cached(self, entity: URIRef) -> bool: 

45 """Check if an entity is in the cache""" 

46 return entity in self.cache 

47 

48 def clear(self) -> None: 

49 """Clear all cached entities""" 

50 self.cache.clear() 

51 

52 

53class MetaEditor: 

54 property_to_remove_method = { 

55 "http://purl.org/spar/datacite/hasIdentifier": "remove_identifier", 

56 "http://purl.org/spar/pro/isHeldBy": "remove_is_held_by", 

57 "http://purl.org/vocab/frbr/core#embodiment": "remove_format", 

58 "http://purl.org/spar/pro/isDocumentContextFor": "remove_is_held_by", 

59 "https://w3id.org/oc/ontology/hasNext": "remove_next", 

60 } 

61 

62 def __init__(self, meta_config: str, resp_agent: str, save_queries: bool = False): 

63 with open(meta_config, encoding="utf-8") as file: 

64 settings = yaml.full_load(file) 

65 self.endpoint = settings["triplestore_url"] 

66 self.provenance_endpoint = settings["provenance_triplestore_url"] 

67 output_dir = settings.get("base_output_dir") 

68 self.data_hotfix_dir = os.path.join(output_dir, "to_be_uploaded_hotfix") 

69 self.prov_hotfix_dir = os.path.join(output_dir, "to_be_uploaded_hotfix") 

70 self.base_dir = os.path.join(output_dir, "rdf") + os.sep 

71 self.base_iri = settings["base_iri"] 

72 self.resp_agent = resp_agent 

73 self.dir_split = settings["dir_split_number"] 

74 self.n_file_item = settings["items_per_file"] 

75 self.zip_output_rdf = settings["zip_output_rdf"] 

76 self.generate_rdf_files = settings.get("generate_rdf_files", True) 

77 self.reader = Reader() 

78 self.save_queries = save_queries 

79 self.update_queries = [] 

80 

81 # Redis configuration 

82 self.redis_host = settings.get("redis_host", "localhost") 

83 self.redis_port = settings.get("redis_port", 6379) 

84 self.redis_db = settings.get("redis_db", 5) 

85 self.counter_handler = RedisCounterHandler( 

86 host=self.redis_host, port=self.redis_port, db=self.redis_db 

87 ) 

88 

89 self.entity_cache = EntityCache() 

90 self.relationship_cache = {} 

91 

92 def update_property( 

93 self, res: URIRef, property: str, new_value: str | URIRef 

94 ) -> None: 

95 supplier_prefix = self.__get_supplier_prefix(res) 

96 g_set = GraphSet( 

97 self.base_iri, 

98 supplier_prefix=supplier_prefix, 

99 custom_counter_handler=self.counter_handler, 

100 ) 

101 self.reader.import_entity_from_triplestore( 

102 g_set, self.endpoint, res, self.resp_agent, enable_validation=False 

103 ) 

104 if validators.url(new_value): 

105 new_value = URIRef(new_value) 

106 self.reader.import_entity_from_triplestore( 

107 g_set, 

108 self.endpoint, 

109 new_value, 

110 self.resp_agent, 

111 enable_validation=False, 

112 ) 

113 getattr(g_set.get_entity(res), property)(g_set.get_entity(new_value)) 

114 else: 

115 getattr(g_set.get_entity(res), property)(new_value) 

116 self.save(g_set, supplier_prefix) 

117 

118 def delete(self, res: str, property: str = None, object: str = None) -> None: 

119 supplier_prefix = self.__get_supplier_prefix(res) 

120 g_set = GraphSet( 

121 self.base_iri, 

122 supplier_prefix=supplier_prefix, 

123 custom_counter_handler=self.counter_handler, 

124 ) 

125 try: 

126 self.reader.import_entity_from_triplestore( 

127 g_set, self.endpoint, res, self.resp_agent, enable_validation=False 

128 ) 

129 except ValueError as e: 

130 print(f"ValueError for entity {res}: {e}") 

131 inferred_type = self.infer_type_from_uri(res) 

132 if inferred_type: 

133 print(f"Inferred type {inferred_type} for entity {res}") 

134 query: str = ( 

135 f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}" 

136 ) 

137 with SPARQLClient(self.endpoint, max_retries=3, backoff_factor=0.3, timeout=3600) as client: 

138 result = client.query(query)["results"]["bindings"] 

139 preexisting_graph: Graph = build_graph_from_results(result) 

140 self.add_entity_with_type(g_set, res, inferred_type, preexisting_graph) 

141 else: 

142 return 

143 if not g_set.get_entity(URIRef(res)): 

144 return 

145 if property: 

146 remove_method = ( 

147 self.property_to_remove_method[property] 

148 if property in self.property_to_remove_method 

149 else ( 

150 property.replace("has", "remove") 

151 if property.startswith("has") 

152 else f"remove_{property}" 

153 ) 

154 ) 

155 if object: 

156 if validators.url(object): 

157 self.reader.import_entity_from_triplestore( 

158 g_set, 

159 self.endpoint, 

160 object, 

161 self.resp_agent, 

162 enable_validation=False, 

163 ) 

164 # try: 

165 getattr(g_set.get_entity(URIRef(res)), remove_method)( 

166 g_set.get_entity(URIRef(object)) 

167 ) 

168 # TypeError: AgentRole.remove_is_held_by() takes 1 positional argument but 2 were given 

169 # except TypeError: 

170 # getattr(g_set.get_entity(URIRef(res)), remove_method)() 

171 else: 

172 getattr(g_set.get_entity(URIRef(res)), remove_method)(object) 

173 else: 

174 getattr(g_set.get_entity(URIRef(res)), remove_method)() 

175 else: 

176 query = f"SELECT ?s WHERE {{?s ?p <{res}>.}}" 

177 with SPARQLClient(self.endpoint, max_retries=3, backoff_factor=0.3, timeout=3600) as client: 

178 result = client.query(query) 

179 for entity in result["results"]["bindings"]: 

180 self.reader.import_entity_from_triplestore( 

181 g_set, 

182 self.endpoint, 

183 URIRef(entity["s"]["value"]), 

184 self.resp_agent, 

185 enable_validation=False, 

186 ) 

187 entity_to_purge = g_set.get_entity(URIRef(res)) 

188 entity_to_purge.mark_as_to_be_deleted() 

189 self.save(g_set, supplier_prefix) 

190 

191 def merge(self, g_set: GraphSet, res: URIRef, other: URIRef) -> None: 

192 """ 

193 Merge two entities and their related entities using batch import with caching. 

194 

195 Args: 

196 g_set: The GraphSet containing the entities 

197 res: The main entity that will absorb the other 

198 other: The entity to be merged into the main one 

199 """ 

200 # First get all related entities with a single SPARQL query 

201 related_entities = set() 

202 with SPARQLClient(self.endpoint, max_retries=5, backoff_factor=0.3, timeout=3600) as client: 

203 if other in self.relationship_cache: 

204 related_entities.update(self.relationship_cache[other]) 

205 else: 

206 query = f""" 

207 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

208 PREFIX datacite: <http://purl.org/spar/datacite/> 

209 PREFIX pro: <http://purl.org/spar/pro/> 

210 SELECT DISTINCT ?entity WHERE {{ 

211 {{?entity ?p <{other}>}} UNION 

212 {{<{other}> ?p ?entity}} 

213 FILTER (?p != rdf:type) 

214 FILTER (?p != datacite:usesIdentifierScheme) 

215 FILTER (?p != pro:withRole) 

216 }}""" 

217 

218 data = client.query(query) 

219 other_related = { 

220 URIRef(result["entity"]["value"]) 

221 for result in data["results"]["bindings"] 

222 if result["entity"]["type"] == "uri" 

223 } 

224 

225 self.relationship_cache[other] = other_related 

226 related_entities.update(other_related) 

227 if res in self.relationship_cache: 

228 related_entities.update(self.relationship_cache[res]) 

229 else: 

230 # Query only for objects of the surviving entity if not in cache 

231 query = f""" 

232 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

233 PREFIX datacite: <http://purl.org/spar/datacite/> 

234 PREFIX pro: <http://purl.org/spar/pro/> 

235 SELECT DISTINCT ?entity WHERE {{ 

236 <{res}> ?p ?entity 

237 FILTER (?p != rdf:type) 

238 FILTER (?p != datacite:usesIdentifierScheme) 

239 FILTER (?p != pro:withRole) 

240 }}""" 

241 

242 data = client.query(query) 

243 res_related = { 

244 URIRef(result["entity"]["value"]) 

245 for result in data["results"]["bindings"] 

246 if result["entity"]["type"] == "uri" 

247 } 

248 

249 self.relationship_cache[res] = res_related 

250 related_entities.update(res_related) 

251 

252 entities_to_import = set([res, other]) 

253 entities_to_import.update(related_entities) 

254 entities_to_import = { 

255 e for e in entities_to_import if not self.entity_cache.is_cached(e) 

256 } 

257 # Import only non-cached entities if there are any 

258 if entities_to_import: 

259 try: 

260 self.reader.import_entities_from_triplestore( 

261 g_set=g_set, 

262 ts_url=self.endpoint, 

263 entities=list(entities_to_import), 

264 resp_agent=self.resp_agent, 

265 enable_validation=False, 

266 batch_size=10, 

267 ) 

268 

269 # Add newly imported entities to cache 

270 for entity in entities_to_import: 

271 self.entity_cache.add(entity) 

272 

273 except ValueError as e: 

274 print(f"Error importing entities: {e}") 

275 return 

276 

277 # Perform the merge 

278 res_as_entity = g_set.get_entity(res) 

279 other_as_entity = g_set.get_entity(other) 

280 

281 is_both_expression = all( 

282 GraphEntity.iri_expression in entity.g.objects(entity.res, RDF.type) 

283 for entity in [res_as_entity, other_as_entity] 

284 ) 

285 

286 if is_both_expression: 

287 res_as_entity.merge(other_as_entity, prefer_self=True) 

288 else: 

289 res_as_entity.merge(other_as_entity) 

290 

291 def sync_rdf_with_triplestore(self, res: str, source_uri: str = None) -> bool: 

292 supplier_prefix = self.__get_supplier_prefix(res) 

293 g_set = GraphSet( 

294 self.base_iri, 

295 supplier_prefix=supplier_prefix, 

296 custom_counter_handler=self.counter_handler, 

297 ) 

298 try: 

299 self.reader.import_entity_from_triplestore( 

300 g_set, self.endpoint, res, self.resp_agent, enable_validation=False 

301 ) 

302 self.save(g_set, supplier_prefix) 

303 return True 

304 except ValueError: 

305 try: 

306 self.reader.import_entity_from_triplestore( 

307 g_set, 

308 self.endpoint, 

309 source_uri, 

310 self.resp_agent, 

311 enable_validation=False, 

312 ) 

313 except ValueError: 

314 res_filepath = self.find_file( 

315 self.base_dir, 

316 self.dir_split, 

317 self.n_file_item, 

318 source_uri, 

319 self.zip_output_rdf, 

320 ) 

321 if not res_filepath: 

322 return False 

323 imported_graph = self.reader.load(res_filepath) 

324 self.reader.import_entities_from_graph( 

325 g_set, imported_graph, self.resp_agent 

326 ) 

327 res_entity = g_set.get_entity(URIRef(source_uri)) 

328 if res_entity: 

329 for res, entity in g_set.res_to_entity.items(): 

330 triples_list = list( 

331 entity.g.triples((URIRef(source_uri), None, None)) 

332 ) 

333 for triple in triples_list: 

334 entity.g.remove(triple) 

335 self.save(g_set, supplier_prefix) 

336 return False 

337 

338 def save(self, g_set: GraphSet, supplier_prefix: str = "") -> None: 

339 provset = ProvSet( 

340 g_set, 

341 self.base_iri, 

342 wanted_label=False, 

343 supplier_prefix=supplier_prefix, 

344 custom_counter_handler=self.counter_handler, 

345 ) 

346 provset.generate_provenance() 

347 graph_storer = Storer( 

348 g_set, 

349 dir_split=self.dir_split, 

350 n_file_item=self.n_file_item, 

351 zip_output=self.zip_output_rdf, 

352 ) 

353 prov_storer = Storer( 

354 provset, 

355 dir_split=self.dir_split, 

356 n_file_item=self.n_file_item, 

357 zip_output=self.zip_output_rdf, 

358 ) 

359 

360 if self.generate_rdf_files: 

361 graph_storer.store_all(self.base_dir, self.base_iri) 

362 prov_storer.store_all(self.base_dir, self.base_iri) 

363 

364 graph_storer.upload_all( 

365 self.endpoint, base_dir=self.data_hotfix_dir, save_queries=self.save_queries 

366 ) 

367 prov_storer.upload_all( 

368 self.provenance_endpoint, base_dir=self.prov_hotfix_dir, save_queries=self.save_queries 

369 ) 

370 g_set.commit_changes() 

371 

372 def __get_supplier_prefix(self, uri: str) -> str: 

373 entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?([1-9][0-9]*)$" 

374 entity_match = re.match(entity_regex, uri) 

375 return entity_match.group(3) 

376 

377 def find_file( 

378 self, 

379 rdf_dir: str, 

380 dir_split_number: str, 

381 items_per_file: str, 

382 uri: str, 

383 zip_output_rdf: bool, 

384 ) -> str | None: 

385 entity_regex: str = ( 

386 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$" 

387 ) 

388 entity_match = re.match(entity_regex, uri) 

389 if entity_match: 

390 cur_number = int(entity_match.group(4)) 

391 cur_file_split: int = 0 

392 while True: 

393 if cur_number > cur_file_split: 

394 cur_file_split += items_per_file 

395 else: 

396 break 

397 cur_split: int = 0 

398 while True: 

399 if cur_number > cur_split: 

400 cur_split += dir_split_number 

401 else: 

402 break 

403 short_name = entity_match.group(2) 

404 sub_folder = entity_match.group(3) 

405 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split)) 

406 extension = ".zip" if zip_output_rdf else ".json" 

407 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + extension 

408 return cur_file_path 

409 

410 def infer_type_from_uri(self, uri: str) -> str: 

411 if os.path.join(self.base_iri, "br") in uri: 

412 return GraphEntity.iri_expression 

413 elif os.path.join(self.base_iri, "ar") in uri: 

414 return GraphEntity.iri_role_in_time 

415 elif os.path.join(self.base_iri, "ra") in uri: 

416 return GraphEntity.iri_agent 

417 elif os.path.join(self.base_iri, "re") in uri: 

418 return GraphEntity.iri_manifestation 

419 elif os.path.join(self.base_iri, "id") in uri: 

420 return GraphEntity.iri_identifier 

421 return None 

422 

423 def add_entity_with_type( 

424 self, 

425 g_set: GraphSet, 

426 res: str, 

427 entity_type: str, 

428 preexisting_graph: Graph, 

429 ): 

430 subject = URIRef(res) 

431 if entity_type == GraphEntity.iri_expression: 

432 g_set.add_br( 

433 resp_agent=self.resp_agent, 

434 res=subject, 

435 preexisting_graph=preexisting_graph, 

436 ) 

437 elif entity_type == GraphEntity.iri_role_in_time: 

438 g_set.add_ar( 

439 resp_agent=self.resp_agent, 

440 res=subject, 

441 preexisting_graph=preexisting_graph, 

442 ) 

443 elif entity_type == GraphEntity.iri_agent: 

444 g_set.add_ra( 

445 resp_agent=self.resp_agent, 

446 res=subject, 

447 preexisting_graph=preexisting_graph, 

448 ) 

449 elif entity_type == GraphEntity.iri_manifestation: 

450 g_set.add_re( 

451 resp_agent=self.resp_agent, 

452 res=subject, 

453 preexisting_graph=preexisting_graph, 

454 ) 

455 elif entity_type == GraphEntity.iri_identifier: 

456 g_set.add_id( 

457 resp_agent=self.resp_agent, 

458 res=subject, 

459 preexisting_graph=preexisting_graph, 

460 )