Coverage for rdflib_ocdm/storer.py: 93%

80 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-11-01 22:02 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright 2023 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17from __future__ import annotations 

18 

19import os 

20from datetime import datetime 

21from typing import TYPE_CHECKING 

22 

23from oc_ocdm.support.reporter import Reporter 

24from SPARQLWrapper import SPARQLWrapper 

25 

26from rdflib_ocdm.ocdm_graph import (OCDMDataset, OCDMGraph, 

27 OCDMGraphCommons) 

28from rdflib_ocdm.query_utils import get_update_query 

29from rdflib_ocdm.retry_utils import execute_with_retry 

30 

31if TYPE_CHECKING: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true

32 from typing import Set 

33 

34 from rdflib import Graph 

35 

36 

37class Storer(object): 

38 

39 def __init__(self, abstract_set: OCDMGraphCommons|Graph, repok: Reporter = None, reperr: Reporter = None, output_format: str = "json-ld", zip_output: bool = False,) -> None: 

40 self.a_set = abstract_set 

41 supported_formats: Set[str] = {'application/n-triples', 'ntriples', 'nt', 'nt11', 

42 'application/n-quads', 'nquads', 'json-ld'} 

43 if output_format not in supported_formats: 

44 raise ValueError(f"Given output_format '{output_format}' is not supported." 

45 f" Available formats: {supported_formats}.") 

46 else: 

47 self.output_format: str = output_format 

48 self.zip_output = zip_output 

49 if repok is None: 

50 self.repok: Reporter = Reporter(prefix="[Storer: INFO] ") 

51 else: 

52 self.repok: Reporter = repok 

53 

54 if reperr is None: 

55 self.reperr: Reporter = Reporter(prefix="[Storer: ERROR] ") 

56 else: 

57 self.reperr: Reporter = reperr 

58 

59 def _query(self, query_string: str, triplestore_url: str, base_dir: str = None, 

60 added_statements: int = 0, removed_statements: int = 0, max_retries: int = 5) -> bool: 

61 if query_string != "": 61 ↛ 94line 61 didn't jump to line 94 because the condition on line 61 was always true

62 try: 

63 # Use the retry utility function with custom error handling 

64 def execute_query(): 

65 sparql: SPARQLWrapper = SPARQLWrapper(triplestore_url) 

66 sparql.setQuery(query_string) 

67 sparql.setMethod('POST') 

68 sparql.query() 

69 return True 

70 

71 execute_with_retry( 

72 execute_query, 

73 max_retries=max_retries, 

74 reporter=self.repok 

75 ) 

76 

77 self.repok.add_sentence( 

78 f"Triplestore updated with {added_statements} added statements and " 

79 f"with {removed_statements} removed statements.") 

80 

81 return True 

82 except ValueError as e: 

83 # Handle the case when all retries failed 

84 self.reperr.add_sentence(f"[3] Graph was not loaded into the triplestore due to communication problems: {e}") 

85 if base_dir is not None: 

86 tp_err_dir: str = base_dir + os.sep + "tp_err" 

87 if not os.path.exists(tp_err_dir): 87 ↛ 89line 87 didn't jump to line 89 because the condition on line 87 was always true

88 os.makedirs(tp_err_dir) 

89 cur_file_err: str = tp_err_dir + os.sep + \ 

90 datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f_not_uploaded.txt') 

91 with open(cur_file_err, 'wt', encoding='utf-8') as f: 

92 f.write(query_string) 

93 return False 

94 return False 

95 

96 def upload_all(self, triplestore_url: str, base_dir: str = None, batch_size: int = 10) -> bool: 

97 self.repok.new_article() 

98 self.reperr.new_article() 

99 if batch_size <= 0: 

100 batch_size = 10 

101 query_string: str = "" 

102 added_statements: int = 0 

103 removed_statements: int = 0 

104 skipped_queries: int = 0 

105 result: bool = True 

106 entity_type = 'graph' if isinstance(self.a_set, OCDMGraph) or isinstance(self.a_set, OCDMDataset) else 'prov' 

107 for idx, entity in enumerate(list(self.a_set.all_entities)): 

108 update_query, n_added, n_removed = get_update_query(self.a_set, entity, entity_type) 

109 if update_query == "": 

110 skipped_queries += 1 

111 else: 

112 index = idx - skipped_queries 

113 if index == 0: 

114 # First query 

115 query_string = update_query 

116 added_statements = n_added 

117 removed_statements = n_removed 

118 elif index % batch_size == 0: 

119 # batch_size-multiple query 

120 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

121 query_string = update_query 

122 added_statements = n_added 

123 removed_statements = n_removed 

124 else: 

125 # Accumulated query 

126 query_string += " ; " + update_query 

127 added_statements += n_added 

128 removed_statements += n_removed 

129 if query_string != "": 129 ↛ 131line 129 didn't jump to line 131 because the condition on line 129 was always true

130 result &= self._query(query_string, triplestore_url, base_dir, added_statements, removed_statements) 

131 return result