Coverage for rdflib_ocdm / reader.py: 100%

56 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-21 12:35 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2023-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7from __future__ import annotations 

8 

9from typing import List, Union 

10 

11from oc_ocdm.support.reporter import Reporter 

12from rdflib import Dataset, Graph, Literal, URIRef 

13from SPARQLWrapper import JSON, POST, XML, SPARQLWrapper 

14 

15from rdflib_ocdm.ocdm_graph import OCDMDataset, OCDMGraph 

16from rdflib_ocdm.retry_utils import execute_with_retry 

17 

18 

19class Reader(object): 

20 def __init__(self, repok: Reporter = None, reperr: Reporter = None): 

21 if repok is None: 

22 self.repok: Reporter = Reporter(prefix="[Reader: INFO] ") 

23 else: 

24 self.repok: Reporter = repok 

25 

26 if reperr is None: 

27 self.reperr: Reporter = Reporter(prefix="[Reader: ERROR] ") 

28 else: 

29 self.reperr: Reporter = reperr 

30 

31 @staticmethod 

32 def import_entities_from_triplestore(ocdm_graph: Union[OCDMGraph, OCDMDataset], ts_url: str, res_list: List[URIRef], max_retries: int = 5) -> None: 

33 sparql: SPARQLWrapper = SPARQLWrapper(ts_url) 

34 

35 if isinstance(ocdm_graph, OCDMDataset): 

36 query: str = f''' 

37 SELECT ?g ?s ?p ?o (LANG(?o) AS ?lang) 

38 WHERE {{ 

39 GRAPH ?g {{ 

40 ?s ?p ?o. 

41 VALUES ?s {{<{'> <'.join(res_list)}>}} 

42 }} 

43 }} 

44 ''' 

45 sparql.setQuery(query) 

46 sparql.setMethod(POST) 

47 sparql.setReturnFormat(JSON) 

48 

49 # Use the retry utility function instead of duplicating retry logic 

50 result = execute_with_retry( 

51 sparql.queryAndConvert, 

52 max_retries=max_retries 

53 ) 

54 

55 if result and 'results' in result and 'bindings' in result['results']: 

56 temp_graph = Dataset() 

57 for binding in result['results']['bindings']: 

58 graph_uri = Graph(identifier=URIRef(binding['g']['value'])) 

59 subject = URIRef(binding['s']['value']) 

60 predicate = URIRef(binding['p']['value']) 

61 

62 obj_data = binding['o'] 

63 if obj_data['type'] == 'uri': 

64 obj = URIRef(obj_data['value']) 

65 else: 

66 value = obj_data['value'] 

67 lang = binding.get('lang', {}).get('value') 

68 datatype = obj_data.get('datatype') 

69 

70 if lang: 

71 obj = Literal(value, lang=lang) 

72 elif datatype: 

73 obj = Literal(value, datatype=URIRef(datatype)) 

74 else: 

75 obj = Literal(value) 

76 

77 temp_graph.add((subject, predicate, obj, graph_uri)) 

78 

79 for quad in temp_graph.quads(): 

80 ocdm_graph.add(quad) 

81 else: 

82 raise ValueError("No entities were found.") 

83 

84 elif isinstance(ocdm_graph, OCDMGraph): 

85 query: str = f''' 

86 CONSTRUCT {{ 

87 ?s ?p ?o 

88 }} 

89 WHERE {{ 

90 ?s ?p ?o.  

91 VALUES ?s {{<{'> <'.join(res_list)}>}} 

92 }} 

93 ''' 

94 sparql.setQuery(query) 

95 sparql.setMethod(POST) 

96 sparql.setReturnFormat(XML) 

97 

98 # Use the retry utility function instead of duplicating retry logic 

99 result: Graph = execute_with_retry( 

100 sparql.queryAndConvert, 

101 max_retries=max_retries 

102 ) 

103 

104 if result is not None and len(result) > 0: 

105 for triple in result: 

106 ocdm_graph.add(triple) 

107 else: 

108 raise ValueError("No entities were found.") 

109 

110 else: 

111 raise TypeError("ocdm_graph must be either OCDMGraph or OCDMDataset")