Coverage for rdflib_ocdm / storer.py: 96%

77 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-21 12:35 +0000

1#!/usr/bin/python 

2 

3# SPDX-FileCopyrightText: 2023-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7from __future__ import annotations 

8 

9import os 

10from datetime import datetime 

11from typing import TYPE_CHECKING 

12 

13from oc_ocdm.support.reporter import Reporter 

14from SPARQLWrapper import SPARQLWrapper 

15 

16from rdflib_ocdm.ocdm_graph import (OCDMDataset, OCDMGraph, 

17 OCDMGraphCommons) 

18from rdflib_ocdm.query_utils import get_update_query 

19from rdflib_ocdm.retry_utils import execute_with_retry 

20 

21if TYPE_CHECKING: 

22 from typing import Set 

23 

24 from rdflib import Graph 

25 

26 

27class Storer(object): 

28 

29 def __init__(self, abstract_set: OCDMGraphCommons|Graph, repok: Reporter = None, reperr: Reporter = None, output_format: str = "json-ld", zip_output: bool = False,) -> None: 

30 self.a_set = abstract_set 

31 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11', 

32 'application/n-quads', 'nquads', 'json-ld'} 

33 if output_format not in supported_formats: 

34 raise ValueError(f"Given output_format '{output_format}' is not supported." 

35 f" Available formats: {supported_formats}.") 

36 else: 

37 self.output_format: str = output_format 

38 self.zip_output = zip_output 

39 if repok is None: 

40 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ") 

41 else: 

42 self.repok: Reporter = repok 

43 

44 if reperr is None: 

45 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ") 

46 else: 

47 self.reperr: Reporter = reperr 

48 

49 def _query(self, query_string: str, triplestore_url: str, base_dir: str = None, 

50 added_statements: int = 0, removed_statements: int = 0, max_retries: int = 5) -> bool: 

51 if query_string != "": 51 ↛ 84line 51 didn't jump to line 84 because the condition on line 51 was always true

52 try: 

53 # Use the retry utility function with custom error handling 

54 def execute_query(): 

55 sparql: SPARQLWrapper = SPARQLWrapper(triplestore_url) 

56 sparql.setQuery(query_string) 

57 sparql.setMethod('POST') 

58 sparql.query() 

59 return True 

60 

61 execute_with_retry( 

62 execute_query, 

63 max_retries=max_retries, 

64 reporter=self.repok 

65 ) 

66 

67 self.repok.add_sentence( 

68 f"Triplestore updated with {added_statements} added statements and " 

69 f"with {removed_statements} removed statements.") 

70 

71 return True 

72 except ValueError as e: 

73 # Handle the case when all retries failed 

74 self.reperr.add_sentence(f"[3] Graph was not loaded into the triplestore due to communication problems: {e}") 

75 if base_dir is not None: 

76 tp_err_dir: str = base_dir + os.sep + "tp_err" 

77 if not os.path.exists(tp_err_dir): 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true

78 os.makedirs(tp_err_dir) 

79 cur_file_err: str = tp_err_dir + os.sep + \ 

80 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt') 

81 with open(cur_file_err, 'wt', encoding='utf-8') as f: 

82 f.write(query_string) 

83 return False 

84 return False 

85 

86 def upload_all(self, triplestore_url: str, base_dir: str = None, batch_size: int = 10) -> bool: 

87 self.repok.new_article() 

88 self.reperr.new_article() 

89 if batch_size <= 0: 

90 batch_size = 10 

91 query_string: str = "" 

92 added_statements: int = 0 

93 removed_statements: int = 0 

94 skipped_queries: int = 0 

95 result: bool = True 

96 entity_type = 'graph' if isinstance(self.a_set, OCDMGraph) or isinstance(self.a_set, OCDMDataset) else 'prov' 

97 for idx, entity in enumerate(list(self.a_set.all_entities)): 

98 update_query, n_added, n_removed = get_update_query(self.a_set, entity, entity_type) 

99 if update_query == "": 

100 skipped_queries += 1 

101 else: 

102 index = idx - skipped_queries 

103 if index == 0: 

104 # First query 

105 query_string = update_query 

106 added_statements = n_added 

107 removed_statements = n_removed 

108 elif index % batch_size == 0: 

109 # batch_size-multiple query 

110 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

111 query_string = update_query 

112 added_statements = n_added 

113 removed_statements = n_removed 

114 else: 

115 # Accumulated query 

116 query_string += " ; " + update_query 

117 added_statements += n_added 

118 removed_statements += n_removed 

119 if query_string != "": 119 ↛ 121line 119 didn't jump to line 121 because the condition on line 119 was always true

120 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

121 return result