Coverage for rdflib_ocdm / reader.py: 100%
56 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 12:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 12:35 +0000
1#!/usr/bin/python
3# SPDX-FileCopyrightText: 2023-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# SPDX-License-Identifier: ISC
7from __future__ import annotations
9from typing import List, Union
11from oc_ocdm.support.reporter import Reporter
12from rdflib import Dataset, Graph, Literal, URIRef
13from SPARQLWrapper import JSON, POST, XML, SPARQLWrapper
15from rdflib_ocdm.ocdm_graph import OCDMDataset, OCDMGraph
16from rdflib_ocdm.retry_utils import execute_with_retry
19class Reader(object):
20 def __init__(self, repok: Reporter = None, reperr: Reporter = None):
21 if repok is None:
22 self.repok: Reporter = Reporter(prefix="[Reader: INFO] ")
23 else:
24 self.repok: Reporter = repok
26 if reperr is None:
27 self.reperr: Reporter = Reporter(prefix="[Reader: ERROR] ")
28 else:
29 self.reperr: Reporter = reperr
31 @staticmethod
32 def import_entities_from_triplestore(ocdm_graph: Union[OCDMGraph, OCDMDataset], ts_url: str, res_list: List[URIRef], max_retries: int = 5) -> None:
33 sparql: SPARQLWrapper = SPARQLWrapper(ts_url)
35 if isinstance(ocdm_graph, OCDMDataset):
36 query: str = f'''
37 SELECT ?g ?s ?p ?o (LANG(?o) AS ?lang)
38 WHERE {{
39 GRAPH ?g {{
40 ?s ?p ?o.
41 VALUES ?s {{<{'> <'.join(res_list)}>}}
42 }}
43 }}
44 '''
45 sparql.setQuery(query)
46 sparql.setMethod(POST)
47 sparql.setReturnFormat(JSON)
49 # Use the retry utility function instead of duplicating retry logic
50 result = execute_with_retry(
51 sparql.queryAndConvert,
52 max_retries=max_retries
53 )
55 if result and 'results' in result and 'bindings' in result['results']:
56 temp_graph = Dataset()
57 for binding in result['results']['bindings']:
58 graph_uri = Graph(identifier=URIRef(binding['g']['value']))
59 subject = URIRef(binding['s']['value'])
60 predicate = URIRef(binding['p']['value'])
62 obj_data = binding['o']
63 if obj_data['type'] == 'uri':
64 obj = URIRef(obj_data['value'])
65 else:
66 value = obj_data['value']
67 lang = binding.get('lang', {}).get('value')
68 datatype = obj_data.get('datatype')
70 if lang:
71 obj = Literal(value, lang=lang)
72 elif datatype:
73 obj = Literal(value, datatype=URIRef(datatype))
74 else:
75 obj = Literal(value)
77 temp_graph.add((subject, predicate, obj, graph_uri))
79 for quad in temp_graph.quads():
80 ocdm_graph.add(quad)
81 else:
82 raise ValueError("No entities were found.")
84 elif isinstance(ocdm_graph, OCDMGraph):
85 query: str = f'''
86 CONSTRUCT {{
87 ?s ?p ?o
88 }}
89 WHERE {{
90 ?s ?p ?o.
91 VALUES ?s {{<{'> <'.join(res_list)}>}}
92 }}
93 '''
94 sparql.setQuery(query)
95 sparql.setMethod(POST)
96 sparql.setReturnFormat(XML)
98 # Use the retry utility function instead of duplicating retry logic
99 result: Graph = execute_with_retry(
100 sparql.queryAndConvert,
101 max_retries=max_retries
102 )
104 if result is not None and len(result) > 0:
105 for triple in result:
106 ocdm_graph.add(triple)
107 else:
108 raise ValueError("No entities were found.")
110 else:
111 raise TypeError("ocdm_graph must be either OCDMGraph or OCDMDataset")