Coverage for oc_meta/plugins/editor.py: 67%
212 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from __future__ import annotations
19import os
20import re
21from typing import Set
23import validators
24import yaml
25from oc_ocdm import Storer
26from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
27from oc_ocdm.graph import GraphSet
28from oc_ocdm.graph.graph_entity import GraphEntity
29from oc_ocdm.prov import ProvSet
30from oc_ocdm.reader import Reader
31from oc_ocdm.support.support import build_graph_from_results
32from rdflib import RDF, Graph, URIRef
33from sparqlite import SPARQLClient
36class EntityCache:
37 def __init__(self):
38 self.cache: Set[URIRef] = set()
40 def add(self, entity: URIRef) -> None:
41 """Add an entity to the cache"""
42 self.cache.add(entity)
44 def is_cached(self, entity: URIRef) -> bool:
45 """Check if an entity is in the cache"""
46 return entity in self.cache
48 def clear(self) -> None:
49 """Clear all cached entities"""
50 self.cache.clear()
53class MetaEditor:
54 property_to_remove_method = {
55 "http://purl.org/spar/datacite/hasIdentifier": "remove_identifier",
56 "http://purl.org/spar/pro/isHeldBy": "remove_is_held_by",
57 "http://purl.org/vocab/frbr/core#embodiment": "remove_format",
58 "http://purl.org/spar/pro/isDocumentContextFor": "remove_is_held_by",
59 "https://w3id.org/oc/ontology/hasNext": "remove_next",
60 }
62 def __init__(self, meta_config: str, resp_agent: str, save_queries: bool = False):
63 with open(meta_config, encoding="utf-8") as file:
64 settings = yaml.full_load(file)
65 self.endpoint = settings["triplestore_url"]
66 self.provenance_endpoint = settings["provenance_triplestore_url"]
67 output_dir = settings.get("base_output_dir")
68 self.data_hotfix_dir = os.path.join(output_dir, "to_be_uploaded_hotfix")
69 self.prov_hotfix_dir = os.path.join(output_dir, "to_be_uploaded_hotfix")
70 self.base_dir = os.path.join(output_dir, "rdf") + os.sep
71 self.base_iri = settings["base_iri"]
72 self.resp_agent = resp_agent
73 self.dir_split = settings["dir_split_number"]
74 self.n_file_item = settings["items_per_file"]
75 self.zip_output_rdf = settings["zip_output_rdf"]
76 self.generate_rdf_files = settings.get("generate_rdf_files", True)
77 self.reader = Reader()
78 self.save_queries = save_queries
79 self.update_queries = []
81 # Redis configuration
82 self.redis_host = settings.get("redis_host", "localhost")
83 self.redis_port = settings.get("redis_port", 6379)
84 self.redis_db = settings.get("redis_db", 5)
85 self.counter_handler = RedisCounterHandler(
86 host=self.redis_host, port=self.redis_port, db=self.redis_db
87 )
89 self.entity_cache = EntityCache()
90 self.relationship_cache = {}
92 def update_property(
93 self, res: URIRef, property: str, new_value: str | URIRef
94 ) -> None:
95 supplier_prefix = self.__get_supplier_prefix(res)
96 g_set = GraphSet(
97 self.base_iri,
98 supplier_prefix=supplier_prefix,
99 custom_counter_handler=self.counter_handler,
100 )
101 self.reader.import_entity_from_triplestore(
102 g_set, self.endpoint, res, self.resp_agent, enable_validation=False
103 )
104 if validators.url(new_value):
105 new_value = URIRef(new_value)
106 self.reader.import_entity_from_triplestore(
107 g_set,
108 self.endpoint,
109 new_value,
110 self.resp_agent,
111 enable_validation=False,
112 )
113 getattr(g_set.get_entity(res), property)(g_set.get_entity(new_value))
114 else:
115 getattr(g_set.get_entity(res), property)(new_value)
116 self.save(g_set, supplier_prefix)
118 def delete(self, res: str, property: str = None, object: str = None) -> None:
119 supplier_prefix = self.__get_supplier_prefix(res)
120 g_set = GraphSet(
121 self.base_iri,
122 supplier_prefix=supplier_prefix,
123 custom_counter_handler=self.counter_handler,
124 )
125 try:
126 self.reader.import_entity_from_triplestore(
127 g_set, self.endpoint, res, self.resp_agent, enable_validation=False
128 )
129 except ValueError as e:
130 print(f"ValueError for entity {res}: {e}")
131 inferred_type = self.infer_type_from_uri(res)
132 if inferred_type:
133 print(f"Inferred type {inferred_type} for entity {res}")
134 query: str = (
135 f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}"
136 )
137 with SPARQLClient(self.endpoint, max_retries=3, backoff_factor=0.3, timeout=3600) as client:
138 result = client.query(query)["results"]["bindings"]
139 preexisting_graph: Graph = build_graph_from_results(result)
140 self.add_entity_with_type(g_set, res, inferred_type, preexisting_graph)
141 else:
142 return
143 if not g_set.get_entity(URIRef(res)):
144 return
145 if property:
146 remove_method = (
147 self.property_to_remove_method[property]
148 if property in self.property_to_remove_method
149 else (
150 property.replace("has", "remove")
151 if property.startswith("has")
152 else f"remove_{property}"
153 )
154 )
155 if object:
156 if validators.url(object):
157 self.reader.import_entity_from_triplestore(
158 g_set,
159 self.endpoint,
160 object,
161 self.resp_agent,
162 enable_validation=False,
163 )
164 # try:
165 getattr(g_set.get_entity(URIRef(res)), remove_method)(
166 g_set.get_entity(URIRef(object))
167 )
168 # TypeError: AgentRole.remove_is_held_by() takes 1 positional argument but 2 were given
169 # except TypeError:
170 # getattr(g_set.get_entity(URIRef(res)), remove_method)()
171 else:
172 getattr(g_set.get_entity(URIRef(res)), remove_method)(object)
173 else:
174 getattr(g_set.get_entity(URIRef(res)), remove_method)()
175 else:
176 query = f"SELECT ?s WHERE {{?s ?p <{res}>.}}"
177 with SPARQLClient(self.endpoint, max_retries=3, backoff_factor=0.3, timeout=3600) as client:
178 result = client.query(query)
179 for entity in result["results"]["bindings"]:
180 self.reader.import_entity_from_triplestore(
181 g_set,
182 self.endpoint,
183 URIRef(entity["s"]["value"]),
184 self.resp_agent,
185 enable_validation=False,
186 )
187 entity_to_purge = g_set.get_entity(URIRef(res))
188 entity_to_purge.mark_as_to_be_deleted()
189 self.save(g_set, supplier_prefix)
191 def merge(self, g_set: GraphSet, res: URIRef, other: URIRef) -> None:
192 """
193 Merge two entities and their related entities using batch import with caching.
195 Args:
196 g_set: The GraphSet containing the entities
197 res: The main entity that will absorb the other
198 other: The entity to be merged into the main one
199 """
200 # First get all related entities with a single SPARQL query
201 related_entities = set()
202 with SPARQLClient(self.endpoint, max_retries=5, backoff_factor=0.3, timeout=3600) as client:
203 if other in self.relationship_cache:
204 related_entities.update(self.relationship_cache[other])
205 else:
206 query = f"""
207 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
208 PREFIX datacite: <http://purl.org/spar/datacite/>
209 PREFIX pro: <http://purl.org/spar/pro/>
210 SELECT DISTINCT ?entity WHERE {{
211 {{?entity ?p <{other}>}} UNION
212 {{<{other}> ?p ?entity}}
213 FILTER (?p != rdf:type)
214 FILTER (?p != datacite:usesIdentifierScheme)
215 FILTER (?p != pro:withRole)
216 }}"""
218 data = client.query(query)
219 other_related = {
220 URIRef(result["entity"]["value"])
221 for result in data["results"]["bindings"]
222 if result["entity"]["type"] == "uri"
223 }
225 self.relationship_cache[other] = other_related
226 related_entities.update(other_related)
227 if res in self.relationship_cache:
228 related_entities.update(self.relationship_cache[res])
229 else:
230 # Query only for objects of the surviving entity if not in cache
231 query = f"""
232 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
233 PREFIX datacite: <http://purl.org/spar/datacite/>
234 PREFIX pro: <http://purl.org/spar/pro/>
235 SELECT DISTINCT ?entity WHERE {{
236 <{res}> ?p ?entity
237 FILTER (?p != rdf:type)
238 FILTER (?p != datacite:usesIdentifierScheme)
239 FILTER (?p != pro:withRole)
240 }}"""
242 data = client.query(query)
243 res_related = {
244 URIRef(result["entity"]["value"])
245 for result in data["results"]["bindings"]
246 if result["entity"]["type"] == "uri"
247 }
249 self.relationship_cache[res] = res_related
250 related_entities.update(res_related)
252 entities_to_import = set([res, other])
253 entities_to_import.update(related_entities)
254 entities_to_import = {
255 e for e in entities_to_import if not self.entity_cache.is_cached(e)
256 }
257 # Import only non-cached entities if there are any
258 if entities_to_import:
259 try:
260 self.reader.import_entities_from_triplestore(
261 g_set=g_set,
262 ts_url=self.endpoint,
263 entities=list(entities_to_import),
264 resp_agent=self.resp_agent,
265 enable_validation=False,
266 batch_size=10,
267 )
269 # Add newly imported entities to cache
270 for entity in entities_to_import:
271 self.entity_cache.add(entity)
273 except ValueError as e:
274 print(f"Error importing entities: {e}")
275 return
277 # Perform the merge
278 res_as_entity = g_set.get_entity(res)
279 other_as_entity = g_set.get_entity(other)
281 is_both_expression = all(
282 GraphEntity.iri_expression in entity.g.objects(entity.res, RDF.type)
283 for entity in [res_as_entity, other_as_entity]
284 )
286 if is_both_expression:
287 res_as_entity.merge(other_as_entity, prefer_self=True)
288 else:
289 res_as_entity.merge(other_as_entity)
291 def sync_rdf_with_triplestore(self, res: str, source_uri: str = None) -> bool:
292 supplier_prefix = self.__get_supplier_prefix(res)
293 g_set = GraphSet(
294 self.base_iri,
295 supplier_prefix=supplier_prefix,
296 custom_counter_handler=self.counter_handler,
297 )
298 try:
299 self.reader.import_entity_from_triplestore(
300 g_set, self.endpoint, res, self.resp_agent, enable_validation=False
301 )
302 self.save(g_set, supplier_prefix)
303 return True
304 except ValueError:
305 try:
306 self.reader.import_entity_from_triplestore(
307 g_set,
308 self.endpoint,
309 source_uri,
310 self.resp_agent,
311 enable_validation=False,
312 )
313 except ValueError:
314 res_filepath = self.find_file(
315 self.base_dir,
316 self.dir_split,
317 self.n_file_item,
318 source_uri,
319 self.zip_output_rdf,
320 )
321 if not res_filepath:
322 return False
323 imported_graph = self.reader.load(res_filepath)
324 self.reader.import_entities_from_graph(
325 g_set, imported_graph, self.resp_agent
326 )
327 res_entity = g_set.get_entity(URIRef(source_uri))
328 if res_entity:
329 for res, entity in g_set.res_to_entity.items():
330 triples_list = list(
331 entity.g.triples((URIRef(source_uri), None, None))
332 )
333 for triple in triples_list:
334 entity.g.remove(triple)
335 self.save(g_set, supplier_prefix)
336 return False
338 def save(self, g_set: GraphSet, supplier_prefix: str = "") -> None:
339 provset = ProvSet(
340 g_set,
341 self.base_iri,
342 wanted_label=False,
343 supplier_prefix=supplier_prefix,
344 custom_counter_handler=self.counter_handler,
345 )
346 provset.generate_provenance()
347 graph_storer = Storer(
348 g_set,
349 dir_split=self.dir_split,
350 n_file_item=self.n_file_item,
351 zip_output=self.zip_output_rdf,
352 )
353 prov_storer = Storer(
354 provset,
355 dir_split=self.dir_split,
356 n_file_item=self.n_file_item,
357 zip_output=self.zip_output_rdf,
358 )
360 if self.generate_rdf_files:
361 graph_storer.store_all(self.base_dir, self.base_iri)
362 prov_storer.store_all(self.base_dir, self.base_iri)
364 graph_storer.upload_all(
365 self.endpoint, base_dir=self.data_hotfix_dir, save_queries=self.save_queries
366 )
367 prov_storer.upload_all(
368 self.provenance_endpoint, base_dir=self.prov_hotfix_dir, save_queries=self.save_queries
369 )
370 g_set.commit_changes()
372 def __get_supplier_prefix(self, uri: str) -> str:
373 entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?([1-9][0-9]*)$"
374 entity_match = re.match(entity_regex, uri)
375 return entity_match.group(3)
377 def find_file(
378 self,
379 rdf_dir: str,
380 dir_split_number: str,
381 items_per_file: str,
382 uri: str,
383 zip_output_rdf: bool,
384 ) -> str | None:
385 entity_regex: str = (
386 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$"
387 )
388 entity_match = re.match(entity_regex, uri)
389 if entity_match:
390 cur_number = int(entity_match.group(4))
391 cur_file_split: int = 0
392 while True:
393 if cur_number > cur_file_split:
394 cur_file_split += items_per_file
395 else:
396 break
397 cur_split: int = 0
398 while True:
399 if cur_number > cur_split:
400 cur_split += dir_split_number
401 else:
402 break
403 short_name = entity_match.group(2)
404 sub_folder = entity_match.group(3)
405 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split))
406 extension = ".zip" if zip_output_rdf else ".json"
407 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + extension
408 return cur_file_path
410 def infer_type_from_uri(self, uri: str) -> str:
411 if os.path.join(self.base_iri, "br") in uri:
412 return GraphEntity.iri_expression
413 elif os.path.join(self.base_iri, "ar") in uri:
414 return GraphEntity.iri_role_in_time
415 elif os.path.join(self.base_iri, "ra") in uri:
416 return GraphEntity.iri_agent
417 elif os.path.join(self.base_iri, "re") in uri:
418 return GraphEntity.iri_manifestation
419 elif os.path.join(self.base_iri, "id") in uri:
420 return GraphEntity.iri_identifier
421 return None
423 def add_entity_with_type(
424 self,
425 g_set: GraphSet,
426 res: str,
427 entity_type: str,
428 preexisting_graph: Graph,
429 ):
430 subject = URIRef(res)
431 if entity_type == GraphEntity.iri_expression:
432 g_set.add_br(
433 resp_agent=self.resp_agent,
434 res=subject,
435 preexisting_graph=preexisting_graph,
436 )
437 elif entity_type == GraphEntity.iri_role_in_time:
438 g_set.add_ar(
439 resp_agent=self.resp_agent,
440 res=subject,
441 preexisting_graph=preexisting_graph,
442 )
443 elif entity_type == GraphEntity.iri_agent:
444 g_set.add_ra(
445 resp_agent=self.resp_agent,
446 res=subject,
447 preexisting_graph=preexisting_graph,
448 )
449 elif entity_type == GraphEntity.iri_manifestation:
450 g_set.add_re(
451 resp_agent=self.resp_agent,
452 res=subject,
453 preexisting_graph=preexisting_graph,
454 )
455 elif entity_type == GraphEntity.iri_identifier:
456 g_set.add_id(
457 resp_agent=self.resp_agent,
458 res=subject,
459 preexisting_graph=preexisting_graph,
460 )