Coverage for oc_meta / core / editor.py: 66%

224 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-21 09:24 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7from __future__ import annotations 

8 

9import os 

10import re 

11 

12import validators 

13import yaml 

14from oc_ocdm import Storer 

15from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

16from oc_ocdm.graph import GraphSet 

17from oc_ocdm.graph.graph_entity import GraphEntity 

18from triplelite import RDFTerm, SubgraphView 

19from oc_ocdm.prov import ProvSet 

20from oc_ocdm.reader import Reader 

21from oc_ocdm.support.support import build_graph_from_results 

22from sparqlite import SPARQLClient 

23 

24 

25class EntityCache: 

26 def __init__(self): 

27 self.cache: set[str] = set() 

28 

29 def add(self, entity: str) -> None: 

30 self.cache.add(entity) 

31 

32 def is_cached(self, entity: str) -> bool: 

33 return entity in self.cache 

34 

35 def clear(self) -> None: 

36 self.cache.clear() 

37 

38 

39class MetaEditor: 

40 property_to_remove_method = { 

41 "http://purl.org/spar/datacite/hasIdentifier": "remove_identifier", 

42 "http://purl.org/spar/pro/isHeldBy": "remove_is_held_by", 

43 "http://purl.org/vocab/frbr/core#embodiment": "remove_format", 

44 "http://purl.org/spar/pro/isDocumentContextFor": "remove_is_held_by", 

45 "https://w3id.org/oc/ontology/hasNext": "remove_next", 

46 } 

47 

48 def __init__(self, meta_config: str, resp_agent: str, save_queries: bool = False): 

49 with open(meta_config, encoding="utf-8") as file: 

50 settings = yaml.full_load(file) 

51 self.endpoint = settings["triplestore_url"] 

52 self.provenance_endpoint = settings["provenance_triplestore_url"] 

53 output_dir = settings.get("base_output_dir") 

54 self.data_hotfix_dir = os.path.join(output_dir, "to_be_uploaded_hotfix") 

55 self.prov_hotfix_dir = os.path.join(output_dir, "to_be_uploaded_hotfix") 

56 self.base_dir = os.path.join(output_dir, "rdf") + os.sep 

57 self.base_iri = settings["base_iri"] 

58 self.resp_agent = resp_agent 

59 self.dir_split = settings["dir_split_number"] 

60 self.n_file_item = settings["items_per_file"] 

61 self.zip_output_rdf = settings["zip_output_rdf"] 

62 self.rdf_files_only = settings.get("rdf_files_only", False) 

63 self.reader = Reader() 

64 self.save_queries = save_queries 

65 self.update_queries = [] 

66 

67 # Redis configuration 

68 self.redis_host = settings.get("redis_host", "localhost") 

69 self.redis_port = settings.get("redis_port", 6379) 

70 self.redis_db = settings.get("redis_db", 5) 

71 self.counter_handler = RedisCounterHandler( 

72 host=self.redis_host, port=self.redis_port, db=self.redis_db 

73 ) 

74 

75 self.entity_cache = EntityCache() 

76 self.relationship_cache = {} 

77 

78 def update_property( 

79 self, res: str, property: str, new_value: str 

80 ) -> None: 

81 supplier_prefix = self.__get_supplier_prefix(res) 

82 g_set = GraphSet( 

83 self.base_iri, 

84 supplier_prefix=supplier_prefix, 

85 custom_counter_handler=self.counter_handler, 

86 ) 

87 self.reader.import_entity_from_triplestore( 

88 g_set, self.endpoint, res, self.resp_agent, enable_validation=False 

89 ) 

90 if validators.url(new_value): 

91 self.reader.import_entity_from_triplestore( 

92 g_set, 

93 self.endpoint, 

94 new_value, 

95 self.resp_agent, 

96 enable_validation=False, 

97 ) 

98 getattr(g_set.get_entity(res), property)(g_set.get_entity(new_value)) 

99 else: 

100 getattr(g_set.get_entity(res), property)(new_value) 

101 self.save(g_set, supplier_prefix) 

102 

103 def delete(self, res: str, property: str | None = None, object: str | None = None) -> None: 

104 res_str = str(res) 

105 supplier_prefix = self.__get_supplier_prefix(res_str) 

106 g_set = GraphSet( 

107 self.base_iri, 

108 supplier_prefix=supplier_prefix, 

109 custom_counter_handler=self.counter_handler, 

110 ) 

111 try: 

112 self.reader.import_entity_from_triplestore( 

113 g_set, self.endpoint, res_str, self.resp_agent, enable_validation=False 

114 ) 

115 except ValueError as e: 

116 print(f"ValueError for entity {res_str}: {e}") 

117 inferred_type = self.infer_type_from_uri(res_str) 

118 if inferred_type: 

119 print(f"Inferred type {inferred_type} for entity {res_str}") 

120 query: str = ( 

121 f"SELECT ?s ?p ?o WHERE {{BIND (<{res_str}> AS ?s). ?s ?p ?o.}}" 

122 ) 

123 with SPARQLClient(self.endpoint, max_retries=3, backoff_factor=0.3, timeout=3600) as client: 

124 result = client.query(query)["results"]["bindings"] 

125 graph = build_graph_from_results(result) 

126 preexisting_graph = graph.subgraph(res_str) 

127 self.add_entity_with_type(g_set, res_str, inferred_type, preexisting_graph) 

128 else: 

129 return 

130 if not g_set.get_entity(res_str): 

131 return 

132 if property: 

133 remove_method = ( 

134 self.property_to_remove_method[property] 

135 if property in self.property_to_remove_method 

136 else ( 

137 property.replace("has", "remove") 

138 if property.startswith("has") 

139 else f"remove_{property}" 

140 ) 

141 ) 

142 if object: 

143 if validators.url(object): 

144 self.reader.import_entity_from_triplestore( 

145 g_set, 

146 self.endpoint, 

147 object, 

148 self.resp_agent, 

149 enable_validation=False, 

150 ) 

151 getattr(g_set.get_entity(res_str), remove_method)( 

152 g_set.get_entity(object) 

153 ) 

154 else: 

155 getattr(g_set.get_entity(res_str), remove_method)(object) 

156 else: 

157 getattr(g_set.get_entity(res_str), remove_method)() 

158 else: 

159 query = f"SELECT ?s WHERE {{?s ?p <{res_str}>.}}" 

160 with SPARQLClient(self.endpoint, max_retries=3, backoff_factor=0.3, timeout=3600) as client: 

161 result = client.query(query) 

162 for entity in result["results"]["bindings"]: 

163 self.reader.import_entity_from_triplestore( 

164 g_set, 

165 self.endpoint, 

166 entity["s"]["value"], 

167 self.resp_agent, 

168 enable_validation=False, 

169 ) 

170 entity_to_purge = g_set.get_entity(res_str) 

171 if not entity_to_purge: 

172 return 

173 entity_to_purge.mark_as_to_be_deleted() 

174 self.save(g_set, supplier_prefix) 

175 

176 def merge(self, g_set: GraphSet, res: str, other: str) -> None: 

177 related_entities: set[str] = set() 

178 with SPARQLClient(self.endpoint, max_retries=5, backoff_factor=0.3, timeout=3600) as client: 

179 if other in self.relationship_cache: 

180 related_entities.update(self.relationship_cache[other]) 

181 else: 

182 query = f""" 

183 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

184 PREFIX datacite: <http://purl.org/spar/datacite/> 

185 PREFIX pro: <http://purl.org/spar/pro/> 

186 SELECT DISTINCT ?entity WHERE {{ 

187 {{?entity ?p <{other}>}} UNION 

188 {{<{other}> ?p ?entity}} 

189 FILTER (?p != rdf:type) 

190 FILTER (?p != datacite:usesIdentifierScheme) 

191 FILTER (?p != pro:withRole) 

192 }}""" 

193 

194 data = client.query(query) 

195 other_related = { 

196 result["entity"]["value"] 

197 for result in data["results"]["bindings"] 

198 if result["entity"]["type"] == "uri" 

199 } 

200 

201 self.relationship_cache[other] = other_related 

202 related_entities.update(other_related) 

203 if res in self.relationship_cache: 

204 related_entities.update(self.relationship_cache[res]) 

205 else: 

206 query = f""" 

207 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

208 PREFIX datacite: <http://purl.org/spar/datacite/> 

209 PREFIX pro: <http://purl.org/spar/pro/> 

210 SELECT DISTINCT ?entity WHERE {{ 

211 <{res}> ?p ?entity 

212 FILTER (?p != rdf:type) 

213 FILTER (?p != datacite:usesIdentifierScheme) 

214 FILTER (?p != pro:withRole) 

215 }}""" 

216 

217 data = client.query(query) 

218 res_related = { 

219 result["entity"]["value"] 

220 for result in data["results"]["bindings"] 

221 if result["entity"]["type"] == "uri" 

222 } 

223 

224 self.relationship_cache[res] = res_related 

225 related_entities.update(res_related) 

226 

227 entities_to_import = {res, other} 

228 entities_to_import.update(related_entities) 

229 entities_to_import = { 

230 e for e in entities_to_import if not self.entity_cache.is_cached(e) 

231 } 

232 if entities_to_import: 

233 try: 

234 self.reader.import_entities_from_triplestore( 

235 g_set=g_set, 

236 ts_url=self.endpoint, 

237 entities=list(entities_to_import), 

238 resp_agent=self.resp_agent, 

239 enable_validation=False, 

240 batch_size=10, 

241 ) 

242 

243 for entity in entities_to_import: 

244 self.entity_cache.add(entity) 

245 

246 except ValueError as e: 

247 print(f"Error importing entities: {e}") 

248 return 

249 

250 res_as_entity = g_set.get_entity(res) 

251 other_as_entity = g_set.get_entity(other) 

252 if not res_as_entity or not other_as_entity: 

253 raise ValueError(f"Entity not found in GraphSet: res={res}, other={other}") 

254 

255 expression_term = RDFTerm("uri", GraphEntity.iri_expression) 

256 rdf_type = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" 

257 is_both_expression = all( 

258 expression_term in entity.g.objects(entity.res, rdf_type) 

259 for entity in [res_as_entity, other_as_entity] 

260 ) 

261 

262 if is_both_expression: 

263 res_as_entity.merge(other_as_entity, prefer_self=True) 

264 else: 

265 res_as_entity.merge(other_as_entity) 

266 

267 def sync_rdf_with_triplestore(self, res: str, source_uri: str | None = None) -> bool: 

268 supplier_prefix = self.__get_supplier_prefix(res) 

269 g_set = GraphSet( 

270 self.base_iri, 

271 supplier_prefix=supplier_prefix, 

272 custom_counter_handler=self.counter_handler, 

273 ) 

274 try: 

275 self.reader.import_entity_from_triplestore( 

276 g_set, self.endpoint, res, self.resp_agent, enable_validation=False 

277 ) 

278 self.save(g_set, supplier_prefix) 

279 return True 

280 except ValueError: 

281 if not source_uri: 

282 return False 

283 try: 

284 self.reader.import_entity_from_triplestore( 

285 g_set, 

286 self.endpoint, 

287 source_uri, 

288 self.resp_agent, 

289 enable_validation=False, 

290 ) 

291 return False 

292 except ValueError: 

293 res_filepath = self.find_file( 

294 self.base_dir, 

295 self.dir_split, 

296 self.n_file_item, 

297 source_uri, 

298 self.zip_output_rdf, 

299 ) 

300 if not res_filepath: 

301 return False 

302 imported_graph = self.reader.load(res_filepath) 

303 if not imported_graph: 

304 return False 

305 self.reader.import_entities_from_graph( 

306 g_set, imported_graph, self.resp_agent 

307 ) 

308 res_entity = g_set.get_entity(source_uri) 

309 if res_entity: 

310 for entity_res, entity in g_set.res_to_entity.items(): 

311 triples_list = list( 

312 entity.g.triples((source_uri, None, None)) 

313 ) 

314 for triple in triples_list: 

315 entity.g.remove(triple) 

316 self.save(g_set, supplier_prefix) 

317 return False 

318 

319 def save(self, g_set: GraphSet, supplier_prefix: str = "") -> None: 

320 provset = ProvSet( 

321 g_set, 

322 self.base_iri, 

323 wanted_label=False, 

324 supplier_prefix=supplier_prefix, 

325 custom_counter_handler=self.counter_handler, 

326 ) 

327 provset.generate_provenance() 

328 graph_storer = Storer( 

329 g_set, 

330 dir_split=self.dir_split, 

331 n_file_item=self.n_file_item, 

332 zip_output=self.zip_output_rdf, 

333 ) 

334 prov_storer = Storer( 

335 provset, 

336 dir_split=self.dir_split, 

337 n_file_item=self.n_file_item, 

338 zip_output=self.zip_output_rdf, 

339 ) 

340 

341 graph_storer.store_all(self.base_dir, self.base_iri) 

342 prov_storer.store_all(self.base_dir, self.base_iri) 

343 

344 if not self.rdf_files_only: 

345 graph_storer.upload_all( 

346 self.endpoint, base_dir=self.data_hotfix_dir, save_queries=self.save_queries 

347 ) 

348 prov_storer.upload_all( 

349 self.provenance_endpoint, base_dir=self.prov_hotfix_dir, save_queries=self.save_queries 

350 ) 

351 g_set.commit_changes() 

352 

353 def __get_supplier_prefix(self, uri: str) -> str: 

354 entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)([1-9][0-9]*)$" 

355 entity_match = re.match(entity_regex, uri) 

356 if not entity_match: 

357 raise ValueError(f"Invalid entity URI: {uri}") 

358 return entity_match.group(3) 

359 

360 def find_file( 

361 self, 

362 rdf_dir: str, 

363 dir_split_number: int, 

364 items_per_file: int, 

365 uri: str, 

366 zip_output_rdf: bool, 

367 ) -> str | None: 

368 entity_regex: str = ( 

369 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$" 

370 ) 

371 entity_match = re.match(entity_regex, uri) 

372 if entity_match: 

373 cur_number = int(entity_match.group(4)) 

374 cur_file_split: int = 0 

375 while True: 

376 if cur_number > cur_file_split: 

377 cur_file_split += items_per_file 

378 else: 

379 break 

380 cur_split: int = 0 

381 while True: 

382 if cur_number > cur_split: 

383 cur_split += dir_split_number 

384 else: 

385 break 

386 short_name = entity_match.group(2) 

387 sub_folder = entity_match.group(3) 

388 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split)) 

389 extension = ".zip" if zip_output_rdf else ".json" 

390 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + extension 

391 return cur_file_path 

392 

393 def infer_type_from_uri(self, uri: str) -> str | None: 

394 if os.path.join(self.base_iri, "br") in uri: 

395 return GraphEntity.iri_expression 

396 elif os.path.join(self.base_iri, "ar") in uri: 

397 return GraphEntity.iri_role_in_time 

398 elif os.path.join(self.base_iri, "ra") in uri: 

399 return GraphEntity.iri_agent 

400 elif os.path.join(self.base_iri, "re") in uri: 

401 return GraphEntity.iri_manifestation 

402 elif os.path.join(self.base_iri, "id") in uri: 

403 return GraphEntity.iri_identifier 

404 return None 

405 

406 def add_entity_with_type( 

407 self, 

408 g_set: GraphSet, 

409 res: str, 

410 entity_type: str, 

411 preexisting_graph: SubgraphView | None, 

412 ): 

413 if entity_type == GraphEntity.iri_expression: 

414 g_set.add_br( 

415 resp_agent=self.resp_agent, 

416 res=res, 

417 preexisting_graph=preexisting_graph, 

418 ) 

419 elif entity_type == GraphEntity.iri_role_in_time: 

420 g_set.add_ar( 

421 resp_agent=self.resp_agent, 

422 res=res, 

423 preexisting_graph=preexisting_graph, 

424 ) 

425 elif entity_type == GraphEntity.iri_agent: 

426 g_set.add_ra( 

427 resp_agent=self.resp_agent, 

428 res=res, 

429 preexisting_graph=preexisting_graph, 

430 ) 

431 elif entity_type == GraphEntity.iri_manifestation: 

432 g_set.add_re( 

433 resp_agent=self.resp_agent, 

434 res=res, 

435 preexisting_graph=preexisting_graph, 

436 ) 

437 elif entity_type == GraphEntity.iri_identifier: 

438 g_set.add_id( 

439 resp_agent=self.resp_agent, 

440 res=res, 

441 preexisting_graph=preexisting_graph, 

442 )