Coverage for oc_meta/plugins/editor.py: 67%
232 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from __future__ import annotations
19import os
20import re
21from time import sleep, time
22from typing import Set
24import validators
25import yaml
26from oc_ocdm import Storer
27from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
28from oc_ocdm.graph import GraphSet
29from oc_ocdm.graph.graph_entity import GraphEntity
30from oc_ocdm.prov import ProvSet
31from oc_ocdm.reader import Reader
32from oc_ocdm.support.support import build_graph_from_results
33from rdflib import RDF, ConjunctiveGraph, URIRef
34from SPARQLWrapper import JSON, SPARQLWrapper
37class EntityCache:
38 def __init__(self):
39 self.cache: Set[URIRef] = set()
41 def add(self, entity: URIRef) -> None:
42 """Add an entity to the cache"""
43 self.cache.add(entity)
45 def is_cached(self, entity: URIRef) -> bool:
46 """Check if an entity is in the cache"""
47 return entity in self.cache
49 def clear(self) -> None:
50 """Clear all cached entities"""
51 self.cache.clear()
54class MetaEditor:
55 property_to_remove_method = {
56 "http://purl.org/spar/datacite/hasIdentifier": "remove_identifier",
57 "http://purl.org/spar/pro/isHeldBy": "remove_is_held_by",
58 "http://purl.org/vocab/frbr/core#embodiment": "remove_format",
59 "http://purl.org/spar/pro/isDocumentContextFor": "remove_is_held_by",
60 "https://w3id.org/oc/ontology/hasNext": "remove_next",
61 }
63 def __init__(self, meta_config: str, resp_agent: str, save_queries: bool = False):
64 with open(meta_config, encoding="utf-8") as file:
65 settings = yaml.full_load(file)
66 self.endpoint = settings["triplestore_url"]
67 self.provenance_endpoint = settings["provenance_triplestore_url"]
68 output_dir = settings.get("base_output_dir")
69 self.base_dir = os.path.join(output_dir, "rdf") + os.sep
70 self.base_iri = settings["base_iri"]
71 self.resp_agent = resp_agent
72 self.dir_split = settings["dir_split_number"]
73 self.n_file_item = settings["items_per_file"]
74 self.zip_output_rdf = settings["zip_output_rdf"]
75 self.generate_rdf_files = settings.get("generate_rdf_files", True)
76 self.reader = Reader()
77 self.save_queries = save_queries
78 self.update_queries = []
80 # Redis configuration
81 self.redis_host = settings.get("redis_host", "localhost")
82 self.redis_port = settings.get("redis_port", 6379)
83 self.redis_db = settings.get("redis_db", 5)
84 self.counter_handler = RedisCounterHandler(
85 host=self.redis_host, port=self.redis_port, db=self.redis_db
86 )
88 self.entity_cache = EntityCache()
89 self.relationship_cache = {}
91 def make_sparql_query_with_retry(
92 self, sparql: SPARQLWrapper, query, max_retries=5, backoff_factor=0.3
93 ):
94 sparql.setQuery(query)
95 sparql.setReturnFormat(JSON)
97 start_time = time()
98 for attempt in range(max_retries):
99 try:
100 return sparql.queryAndConvert()
101 except Exception as e:
102 duration = time() - start_time
103 if attempt < max_retries - 1:
104 sleep_time = backoff_factor * (2**attempt)
105 print(query, duration, f"retry_{attempt + 1}")
106 sleep(sleep_time)
107 else:
108 print(f"SPARQL query failed after {max_retries} attempts: {e}")
109 raise
111 def update_property(
112 self, res: URIRef, property: str, new_value: str | URIRef
113 ) -> None:
114 supplier_prefix = self.__get_supplier_prefix(res)
115 g_set = GraphSet(
116 self.base_iri,
117 supplier_prefix=supplier_prefix,
118 custom_counter_handler=self.counter_handler,
119 )
120 self.reader.import_entity_from_triplestore(
121 g_set, self.endpoint, res, self.resp_agent, enable_validation=False
122 )
123 if validators.url(new_value):
124 new_value = URIRef(new_value)
125 self.reader.import_entity_from_triplestore(
126 g_set,
127 self.endpoint,
128 new_value,
129 self.resp_agent,
130 enable_validation=False,
131 )
132 getattr(g_set.get_entity(res), property)(g_set.get_entity(new_value))
133 else:
134 getattr(g_set.get_entity(res), property)(new_value)
135 self.save(g_set, supplier_prefix)
137 def delete(self, res: str, property: str = None, object: str = None) -> None:
138 supplier_prefix = self.__get_supplier_prefix(res)
139 g_set = GraphSet(
140 self.base_iri,
141 supplier_prefix=supplier_prefix,
142 custom_counter_handler=self.counter_handler,
143 )
144 try:
145 self.reader.import_entity_from_triplestore(
146 g_set, self.endpoint, res, self.resp_agent, enable_validation=False
147 )
148 except ValueError as e:
149 print(f"ValueError for entity {res}: {e}")
150 inferred_type = self.infer_type_from_uri(res)
151 if inferred_type:
152 print(f"Inferred type {inferred_type} for entity {res}")
153 sparql: SPARQLWrapper = SPARQLWrapper(self.endpoint)
154 query: str = (
155 f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}"
156 )
157 sparql.setQuery(query)
158 sparql.setMethod("GET")
159 sparql.setReturnFormat(JSON)
160 result = sparql.queryAndConvert()["results"]["bindings"]
161 preexisting_graph: ConjunctiveGraph = build_graph_from_results(result)
162 self.add_entity_with_type(g_set, res, inferred_type, preexisting_graph)
163 else:
164 return
165 if not g_set.get_entity(URIRef(res)):
166 return
167 if property:
168 remove_method = (
169 self.property_to_remove_method[property]
170 if property in self.property_to_remove_method
171 else (
172 property.replace("has", "remove")
173 if property.startswith("has")
174 else f"remove_{property}"
175 )
176 )
177 if object:
178 if validators.url(object):
179 self.reader.import_entity_from_triplestore(
180 g_set,
181 self.endpoint,
182 object,
183 self.resp_agent,
184 enable_validation=False,
185 )
186 # try:
187 getattr(g_set.get_entity(URIRef(res)), remove_method)(
188 g_set.get_entity(URIRef(object))
189 )
190 # TypeError: AgentRole.remove_is_held_by() takes 1 positional argument but 2 were given
191 # except TypeError:
192 # getattr(g_set.get_entity(URIRef(res)), remove_method)()
193 else:
194 getattr(g_set.get_entity(URIRef(res)), remove_method)(object)
195 else:
196 getattr(g_set.get_entity(URIRef(res)), remove_method)()
197 else:
198 sparql = SPARQLWrapper(endpoint=self.endpoint)
199 query = f"SELECT ?s WHERE {{?s ?p <{res}>.}}"
200 sparql.setQuery(query)
201 sparql.setReturnFormat(JSON)
202 result = sparql.queryAndConvert()
203 for entity in result["results"]["bindings"]:
204 self.reader.import_entity_from_triplestore(
205 g_set,
206 self.endpoint,
207 URIRef(entity["s"]["value"]),
208 self.resp_agent,
209 enable_validation=False,
210 )
211 entity_to_purge = g_set.get_entity(URIRef(res))
212 entity_to_purge.mark_as_to_be_deleted()
213 self.save(g_set, supplier_prefix)
215 def merge(self, g_set: GraphSet, res: URIRef, other: URIRef) -> None:
216 """
217 Merge two entities and their related entities using batch import with caching.
219 Args:
220 g_set: The GraphSet containing the entities
221 res: The main entity that will absorb the other
222 other: The entity to be merged into the main one
223 """
224 # First get all related entities with a single SPARQL query
225 related_entities = set()
226 if other in self.relationship_cache:
227 related_entities.update(self.relationship_cache[other])
228 else:
229 sparql = SPARQLWrapper(endpoint=self.endpoint)
230 query = f"""
231 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
232 PREFIX datacite: <http://purl.org/spar/datacite/>
233 PREFIX pro: <http://purl.org/spar/pro/>
234 SELECT DISTINCT ?entity WHERE {{
235 {{?entity ?p <{other}>}} UNION
236 {{<{other}> ?p ?entity}}
237 FILTER (?p != rdf:type)
238 FILTER (?p != datacite:usesIdentifierScheme)
239 FILTER (?p != pro:withRole)
240 }}"""
242 data = self.make_sparql_query_with_retry(sparql, query)
243 other_related = {
244 URIRef(result["entity"]["value"])
245 for result in data["results"]["bindings"]
246 if result["entity"]["type"] == "uri"
247 }
249 self.relationship_cache[other] = other_related
250 related_entities.update(other_related)
251 if res in self.relationship_cache:
252 related_entities.update(self.relationship_cache[res])
253 else:
254 # Query only for objects of the surviving entity if not in cache
255 sparql = SPARQLWrapper(endpoint=self.endpoint)
256 query = f"""
257 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
258 PREFIX datacite: <http://purl.org/spar/datacite/>
259 PREFIX pro: <http://purl.org/spar/pro/>
260 SELECT DISTINCT ?entity WHERE {{
261 <{res}> ?p ?entity
262 FILTER (?p != rdf:type)
263 FILTER (?p != datacite:usesIdentifierScheme)
264 FILTER (?p != pro:withRole)
265 }}"""
267 data = self.make_sparql_query_with_retry(sparql, query)
268 res_related = {
269 URIRef(result["entity"]["value"])
270 for result in data["results"]["bindings"]
271 if result["entity"]["type"] == "uri"
272 }
274 self.relationship_cache[res] = res_related
275 related_entities.update(res_related)
277 entities_to_import = set([res, other])
278 entities_to_import.update(related_entities)
279 entities_to_import = {
280 e for e in entities_to_import if not self.entity_cache.is_cached(e)
281 }
282 # Import only non-cached entities if there are any
283 if entities_to_import:
284 try:
285 self.reader.import_entities_from_triplestore(
286 g_set=g_set,
287 ts_url=self.endpoint,
288 entities=list(entities_to_import),
289 resp_agent=self.resp_agent,
290 enable_validation=False,
291 batch_size=10,
292 )
294 # Add newly imported entities to cache
295 for entity in entities_to_import:
296 self.entity_cache.add(entity)
298 except ValueError as e:
299 print(f"Error importing entities: {e}")
300 return
302 # Perform the merge
303 res_as_entity = g_set.get_entity(res)
304 other_as_entity = g_set.get_entity(other)
306 is_both_expression = all(
307 GraphEntity.iri_expression in entity.g.objects(entity.res, RDF.type)
308 for entity in [res_as_entity, other_as_entity]
309 )
311 if is_both_expression:
312 res_as_entity.merge(other_as_entity, prefer_self=True)
313 else:
314 res_as_entity.merge(other_as_entity)
316 def sync_rdf_with_triplestore(self, res: str, source_uri: str = None) -> bool:
317 supplier_prefix = self.__get_supplier_prefix(res)
318 g_set = GraphSet(
319 self.base_iri,
320 supplier_prefix=supplier_prefix,
321 custom_counter_handler=self.counter_handler,
322 )
323 try:
324 self.reader.import_entity_from_triplestore(
325 g_set, self.endpoint, res, self.resp_agent, enable_validation=False
326 )
327 self.save(g_set, supplier_prefix)
328 return True
329 except ValueError:
330 try:
331 self.reader.import_entity_from_triplestore(
332 g_set,
333 self.endpoint,
334 source_uri,
335 self.resp_agent,
336 enable_validation=False,
337 )
338 except ValueError:
339 res_filepath = self.find_file(
340 self.base_dir,
341 self.dir_split,
342 self.n_file_item,
343 source_uri,
344 self.zip_output_rdf,
345 )
346 if not res_filepath:
347 return False
348 imported_graph = self.reader.load(res_filepath)
349 self.reader.import_entities_from_graph(
350 g_set, imported_graph, self.resp_agent
351 )
352 res_entity = g_set.get_entity(URIRef(source_uri))
353 if res_entity:
354 for res, entity in g_set.res_to_entity.items():
355 triples_list = list(
356 entity.g.triples((URIRef(source_uri), None, None))
357 )
358 for triple in triples_list:
359 entity.g.remove(triple)
360 self.save(g_set, supplier_prefix)
361 return False
363 def save(self, g_set: GraphSet, supplier_prefix: str = "") -> None:
364 provset = ProvSet(
365 g_set,
366 self.base_iri,
367 wanted_label=False,
368 supplier_prefix=supplier_prefix,
369 custom_counter_handler=self.counter_handler,
370 )
371 provset.generate_provenance()
372 graph_storer = Storer(
373 g_set,
374 dir_split=self.dir_split,
375 n_file_item=self.n_file_item,
376 zip_output=self.zip_output_rdf,
377 )
378 prov_storer = Storer(
379 provset,
380 dir_split=self.dir_split,
381 n_file_item=self.n_file_item,
382 zip_output=self.zip_output_rdf,
383 )
385 if self.generate_rdf_files:
386 graph_storer.store_all(self.base_dir, self.base_iri)
387 prov_storer.store_all(self.base_dir, self.base_iri)
389 graph_storer.upload_all(
390 self.endpoint, base_dir=self.base_dir, save_queries=self.save_queries
391 )
392 prov_storer.upload_all(
393 self.provenance_endpoint, base_dir=self.base_dir, save_queries=self.save_queries
394 )
395 g_set.commit_changes()
397 def __get_supplier_prefix(self, uri: str) -> str:
398 entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?([1-9][0-9]*)$"
399 entity_match = re.match(entity_regex, uri)
400 return entity_match.group(3)
402 def find_file(
403 self,
404 rdf_dir: str,
405 dir_split_number: str,
406 items_per_file: str,
407 uri: str,
408 zip_output_rdf: bool,
409 ) -> str | None:
410 entity_regex: str = (
411 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$"
412 )
413 entity_match = re.match(entity_regex, uri)
414 if entity_match:
415 cur_number = int(entity_match.group(4))
416 cur_file_split: int = 0
417 while True:
418 if cur_number > cur_file_split:
419 cur_file_split += items_per_file
420 else:
421 break
422 cur_split: int = 0
423 while True:
424 if cur_number > cur_split:
425 cur_split += dir_split_number
426 else:
427 break
428 short_name = entity_match.group(2)
429 sub_folder = entity_match.group(3)
430 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split))
431 extension = ".zip" if zip_output_rdf else ".json"
432 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + extension
433 return cur_file_path
435 def infer_type_from_uri(self, uri: str) -> str:
436 if os.path.join(self.base_iri, "br") in uri:
437 return GraphEntity.iri_expression
438 elif os.path.join(self.base_iri, "ar") in uri:
439 return GraphEntity.iri_role_in_time
440 elif os.path.join(self.base_iri, "ra") in uri:
441 return GraphEntity.iri_agent
442 elif os.path.join(self.base_iri, "re") in uri:
443 return GraphEntity.iri_manifestation
444 elif os.path.join(self.base_iri, "id") in uri:
445 return GraphEntity.iri_identifier
446 return None
448 def add_entity_with_type(
449 self,
450 g_set: GraphSet,
451 res: str,
452 entity_type: str,
453 preexisting_graph: ConjunctiveGraph,
454 ):
455 subject = URIRef(res)
456 if entity_type == GraphEntity.iri_expression:
457 g_set.add_br(
458 resp_agent=self.resp_agent,
459 res=subject,
460 preexisting_graph=preexisting_graph,
461 )
462 elif entity_type == GraphEntity.iri_role_in_time:
463 g_set.add_ar(
464 resp_agent=self.resp_agent,
465 res=subject,
466 preexisting_graph=preexisting_graph,
467 )
468 elif entity_type == GraphEntity.iri_agent:
469 g_set.add_ra(
470 resp_agent=self.resp_agent,
471 res=subject,
472 preexisting_graph=preexisting_graph,
473 )
474 elif entity_type == GraphEntity.iri_manifestation:
475 g_set.add_re(
476 resp_agent=self.resp_agent,
477 res=subject,
478 preexisting_graph=preexisting_graph,
479 )
480 elif entity_type == GraphEntity.iri_identifier:
481 g_set.add_id(
482 resp_agent=self.resp_agent,
483 res=subject,
484 preexisting_graph=preexisting_graph,
485 )