Coverage for oc_meta / core / editor.py: 66%
224 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-21 09:24 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7from __future__ import annotations
9import os
10import re
12import validators
13import yaml
14from oc_ocdm import Storer
15from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
16from oc_ocdm.graph import GraphSet
17from oc_ocdm.graph.graph_entity import GraphEntity
18from triplelite import RDFTerm, SubgraphView
19from oc_ocdm.prov import ProvSet
20from oc_ocdm.reader import Reader
21from oc_ocdm.support.support import build_graph_from_results
22from sparqlite import SPARQLClient
25class EntityCache:
26 def __init__(self):
27 self.cache: set[str] = set()
29 def add(self, entity: str) -> None:
30 self.cache.add(entity)
32 def is_cached(self, entity: str) -> bool:
33 return entity in self.cache
35 def clear(self) -> None:
36 self.cache.clear()
39class MetaEditor:
40 property_to_remove_method = {
41 "http://purl.org/spar/datacite/hasIdentifier": "remove_identifier",
42 "http://purl.org/spar/pro/isHeldBy": "remove_is_held_by",
43 "http://purl.org/vocab/frbr/core#embodiment": "remove_format",
44 "http://purl.org/spar/pro/isDocumentContextFor": "remove_is_held_by",
45 "https://w3id.org/oc/ontology/hasNext": "remove_next",
46 }
48 def __init__(self, meta_config: str, resp_agent: str, save_queries: bool = False):
49 with open(meta_config, encoding="utf-8") as file:
50 settings = yaml.full_load(file)
51 self.endpoint = settings["triplestore_url"]
52 self.provenance_endpoint = settings["provenance_triplestore_url"]
53 output_dir = settings.get("base_output_dir")
54 self.data_hotfix_dir = os.path.join(output_dir, "to_be_uploaded_hotfix")
55 self.prov_hotfix_dir = os.path.join(output_dir, "to_be_uploaded_hotfix")
56 self.base_dir = os.path.join(output_dir, "rdf") + os.sep
57 self.base_iri = settings["base_iri"]
58 self.resp_agent = resp_agent
59 self.dir_split = settings["dir_split_number"]
60 self.n_file_item = settings["items_per_file"]
61 self.zip_output_rdf = settings["zip_output_rdf"]
62 self.rdf_files_only = settings.get("rdf_files_only", False)
63 self.reader = Reader()
64 self.save_queries = save_queries
65 self.update_queries = []
67 # Redis configuration
68 self.redis_host = settings.get("redis_host", "localhost")
69 self.redis_port = settings.get("redis_port", 6379)
70 self.redis_db = settings.get("redis_db", 5)
71 self.counter_handler = RedisCounterHandler(
72 host=self.redis_host, port=self.redis_port, db=self.redis_db
73 )
75 self.entity_cache = EntityCache()
76 self.relationship_cache = {}
78 def update_property(
79 self, res: str, property: str, new_value: str
80 ) -> None:
81 supplier_prefix = self.__get_supplier_prefix(res)
82 g_set = GraphSet(
83 self.base_iri,
84 supplier_prefix=supplier_prefix,
85 custom_counter_handler=self.counter_handler,
86 )
87 self.reader.import_entity_from_triplestore(
88 g_set, self.endpoint, res, self.resp_agent, enable_validation=False
89 )
90 if validators.url(new_value):
91 self.reader.import_entity_from_triplestore(
92 g_set,
93 self.endpoint,
94 new_value,
95 self.resp_agent,
96 enable_validation=False,
97 )
98 getattr(g_set.get_entity(res), property)(g_set.get_entity(new_value))
99 else:
100 getattr(g_set.get_entity(res), property)(new_value)
101 self.save(g_set, supplier_prefix)
103 def delete(self, res: str, property: str | None = None, object: str | None = None) -> None:
104 res_str = str(res)
105 supplier_prefix = self.__get_supplier_prefix(res_str)
106 g_set = GraphSet(
107 self.base_iri,
108 supplier_prefix=supplier_prefix,
109 custom_counter_handler=self.counter_handler,
110 )
111 try:
112 self.reader.import_entity_from_triplestore(
113 g_set, self.endpoint, res_str, self.resp_agent, enable_validation=False
114 )
115 except ValueError as e:
116 print(f"ValueError for entity {res_str}: {e}")
117 inferred_type = self.infer_type_from_uri(res_str)
118 if inferred_type:
119 print(f"Inferred type {inferred_type} for entity {res_str}")
120 query: str = (
121 f"SELECT ?s ?p ?o WHERE {{BIND (<{res_str}> AS ?s). ?s ?p ?o.}}"
122 )
123 with SPARQLClient(self.endpoint, max_retries=3, backoff_factor=0.3, timeout=3600) as client:
124 result = client.query(query)["results"]["bindings"]
125 graph = build_graph_from_results(result)
126 preexisting_graph = graph.subgraph(res_str)
127 self.add_entity_with_type(g_set, res_str, inferred_type, preexisting_graph)
128 else:
129 return
130 if not g_set.get_entity(res_str):
131 return
132 if property:
133 remove_method = (
134 self.property_to_remove_method[property]
135 if property in self.property_to_remove_method
136 else (
137 property.replace("has", "remove")
138 if property.startswith("has")
139 else f"remove_{property}"
140 )
141 )
142 if object:
143 if validators.url(object):
144 self.reader.import_entity_from_triplestore(
145 g_set,
146 self.endpoint,
147 object,
148 self.resp_agent,
149 enable_validation=False,
150 )
151 getattr(g_set.get_entity(res_str), remove_method)(
152 g_set.get_entity(object)
153 )
154 else:
155 getattr(g_set.get_entity(res_str), remove_method)(object)
156 else:
157 getattr(g_set.get_entity(res_str), remove_method)()
158 else:
159 query = f"SELECT ?s WHERE {{?s ?p <{res_str}>.}}"
160 with SPARQLClient(self.endpoint, max_retries=3, backoff_factor=0.3, timeout=3600) as client:
161 result = client.query(query)
162 for entity in result["results"]["bindings"]:
163 self.reader.import_entity_from_triplestore(
164 g_set,
165 self.endpoint,
166 entity["s"]["value"],
167 self.resp_agent,
168 enable_validation=False,
169 )
170 entity_to_purge = g_set.get_entity(res_str)
171 if not entity_to_purge:
172 return
173 entity_to_purge.mark_as_to_be_deleted()
174 self.save(g_set, supplier_prefix)
176 def merge(self, g_set: GraphSet, res: str, other: str) -> None:
177 related_entities: set[str] = set()
178 with SPARQLClient(self.endpoint, max_retries=5, backoff_factor=0.3, timeout=3600) as client:
179 if other in self.relationship_cache:
180 related_entities.update(self.relationship_cache[other])
181 else:
182 query = f"""
183 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
184 PREFIX datacite: <http://purl.org/spar/datacite/>
185 PREFIX pro: <http://purl.org/spar/pro/>
186 SELECT DISTINCT ?entity WHERE {{
187 {{?entity ?p <{other}>}} UNION
188 {{<{other}> ?p ?entity}}
189 FILTER (?p != rdf:type)
190 FILTER (?p != datacite:usesIdentifierScheme)
191 FILTER (?p != pro:withRole)
192 }}"""
194 data = client.query(query)
195 other_related = {
196 result["entity"]["value"]
197 for result in data["results"]["bindings"]
198 if result["entity"]["type"] == "uri"
199 }
201 self.relationship_cache[other] = other_related
202 related_entities.update(other_related)
203 if res in self.relationship_cache:
204 related_entities.update(self.relationship_cache[res])
205 else:
206 query = f"""
207 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
208 PREFIX datacite: <http://purl.org/spar/datacite/>
209 PREFIX pro: <http://purl.org/spar/pro/>
210 SELECT DISTINCT ?entity WHERE {{
211 <{res}> ?p ?entity
212 FILTER (?p != rdf:type)
213 FILTER (?p != datacite:usesIdentifierScheme)
214 FILTER (?p != pro:withRole)
215 }}"""
217 data = client.query(query)
218 res_related = {
219 result["entity"]["value"]
220 for result in data["results"]["bindings"]
221 if result["entity"]["type"] == "uri"
222 }
224 self.relationship_cache[res] = res_related
225 related_entities.update(res_related)
227 entities_to_import = {res, other}
228 entities_to_import.update(related_entities)
229 entities_to_import = {
230 e for e in entities_to_import if not self.entity_cache.is_cached(e)
231 }
232 if entities_to_import:
233 try:
234 self.reader.import_entities_from_triplestore(
235 g_set=g_set,
236 ts_url=self.endpoint,
237 entities=list(entities_to_import),
238 resp_agent=self.resp_agent,
239 enable_validation=False,
240 batch_size=10,
241 )
243 for entity in entities_to_import:
244 self.entity_cache.add(entity)
246 except ValueError as e:
247 print(f"Error importing entities: {e}")
248 return
250 res_as_entity = g_set.get_entity(res)
251 other_as_entity = g_set.get_entity(other)
252 if not res_as_entity or not other_as_entity:
253 raise ValueError(f"Entity not found in GraphSet: res={res}, other={other}")
255 expression_term = RDFTerm("uri", GraphEntity.iri_expression)
256 rdf_type = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
257 is_both_expression = all(
258 expression_term in entity.g.objects(entity.res, rdf_type)
259 for entity in [res_as_entity, other_as_entity]
260 )
262 if is_both_expression:
263 res_as_entity.merge(other_as_entity, prefer_self=True)
264 else:
265 res_as_entity.merge(other_as_entity)
267 def sync_rdf_with_triplestore(self, res: str, source_uri: str | None = None) -> bool:
268 supplier_prefix = self.__get_supplier_prefix(res)
269 g_set = GraphSet(
270 self.base_iri,
271 supplier_prefix=supplier_prefix,
272 custom_counter_handler=self.counter_handler,
273 )
274 try:
275 self.reader.import_entity_from_triplestore(
276 g_set, self.endpoint, res, self.resp_agent, enable_validation=False
277 )
278 self.save(g_set, supplier_prefix)
279 return True
280 except ValueError:
281 if not source_uri:
282 return False
283 try:
284 self.reader.import_entity_from_triplestore(
285 g_set,
286 self.endpoint,
287 source_uri,
288 self.resp_agent,
289 enable_validation=False,
290 )
291 return False
292 except ValueError:
293 res_filepath = self.find_file(
294 self.base_dir,
295 self.dir_split,
296 self.n_file_item,
297 source_uri,
298 self.zip_output_rdf,
299 )
300 if not res_filepath:
301 return False
302 imported_graph = self.reader.load(res_filepath)
303 if not imported_graph:
304 return False
305 self.reader.import_entities_from_graph(
306 g_set, imported_graph, self.resp_agent
307 )
308 res_entity = g_set.get_entity(source_uri)
309 if res_entity:
310 for entity_res, entity in g_set.res_to_entity.items():
311 triples_list = list(
312 entity.g.triples((source_uri, None, None))
313 )
314 for triple in triples_list:
315 entity.g.remove(triple)
316 self.save(g_set, supplier_prefix)
317 return False
319 def save(self, g_set: GraphSet, supplier_prefix: str = "") -> None:
320 provset = ProvSet(
321 g_set,
322 self.base_iri,
323 wanted_label=False,
324 supplier_prefix=supplier_prefix,
325 custom_counter_handler=self.counter_handler,
326 )
327 provset.generate_provenance()
328 graph_storer = Storer(
329 g_set,
330 dir_split=self.dir_split,
331 n_file_item=self.n_file_item,
332 zip_output=self.zip_output_rdf,
333 )
334 prov_storer = Storer(
335 provset,
336 dir_split=self.dir_split,
337 n_file_item=self.n_file_item,
338 zip_output=self.zip_output_rdf,
339 )
341 graph_storer.store_all(self.base_dir, self.base_iri)
342 prov_storer.store_all(self.base_dir, self.base_iri)
344 if not self.rdf_files_only:
345 graph_storer.upload_all(
346 self.endpoint, base_dir=self.data_hotfix_dir, save_queries=self.save_queries
347 )
348 prov_storer.upload_all(
349 self.provenance_endpoint, base_dir=self.prov_hotfix_dir, save_queries=self.save_queries
350 )
351 g_set.commit_changes()
353 def __get_supplier_prefix(self, uri: str) -> str:
354 entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)([1-9][0-9]*)$"
355 entity_match = re.match(entity_regex, uri)
356 if not entity_match:
357 raise ValueError(f"Invalid entity URI: {uri}")
358 return entity_match.group(3)
360 def find_file(
361 self,
362 rdf_dir: str,
363 dir_split_number: int,
364 items_per_file: int,
365 uri: str,
366 zip_output_rdf: bool,
367 ) -> str | None:
368 entity_regex: str = (
369 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$"
370 )
371 entity_match = re.match(entity_regex, uri)
372 if entity_match:
373 cur_number = int(entity_match.group(4))
374 cur_file_split: int = 0
375 while True:
376 if cur_number > cur_file_split:
377 cur_file_split += items_per_file
378 else:
379 break
380 cur_split: int = 0
381 while True:
382 if cur_number > cur_split:
383 cur_split += dir_split_number
384 else:
385 break
386 short_name = entity_match.group(2)
387 sub_folder = entity_match.group(3)
388 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split))
389 extension = ".zip" if zip_output_rdf else ".json"
390 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + extension
391 return cur_file_path
393 def infer_type_from_uri(self, uri: str) -> str | None:
394 if os.path.join(self.base_iri, "br") in uri:
395 return GraphEntity.iri_expression
396 elif os.path.join(self.base_iri, "ar") in uri:
397 return GraphEntity.iri_role_in_time
398 elif os.path.join(self.base_iri, "ra") in uri:
399 return GraphEntity.iri_agent
400 elif os.path.join(self.base_iri, "re") in uri:
401 return GraphEntity.iri_manifestation
402 elif os.path.join(self.base_iri, "id") in uri:
403 return GraphEntity.iri_identifier
404 return None
406 def add_entity_with_type(
407 self,
408 g_set: GraphSet,
409 res: str,
410 entity_type: str,
411 preexisting_graph: SubgraphView | None,
412 ):
413 if entity_type == GraphEntity.iri_expression:
414 g_set.add_br(
415 resp_agent=self.resp_agent,
416 res=res,
417 preexisting_graph=preexisting_graph,
418 )
419 elif entity_type == GraphEntity.iri_role_in_time:
420 g_set.add_ar(
421 resp_agent=self.resp_agent,
422 res=res,
423 preexisting_graph=preexisting_graph,
424 )
425 elif entity_type == GraphEntity.iri_agent:
426 g_set.add_ra(
427 resp_agent=self.resp_agent,
428 res=res,
429 preexisting_graph=preexisting_graph,
430 )
431 elif entity_type == GraphEntity.iri_manifestation:
432 g_set.add_re(
433 resp_agent=self.resp_agent,
434 res=res,
435 preexisting_graph=preexisting_graph,
436 )
437 elif entity_type == GraphEntity.iri_identifier:
438 g_set.add_id(
439 resp_agent=self.resp_agent,
440 res=res,
441 preexisting_graph=preexisting_graph,
442 )