Coverage for src / sparqlite / client.py: 100%

77 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-20 08:07 +0000

1"""Synchronous SPARQL client.""" 

2 

3import json 

4import time 

5import warnings 

6from io import BytesIO 

7from urllib.parse import urlencode 

8 

9import pycurl 

10 

11from sparqlite.exceptions import EndpointError, QueryError 

12 

13 

14class SPARQLClient: 

15 """Synchronous SPARQL 1.1 client with connection pooling and automatic retry.""" 

16 

17 def __init__( 

18 self, 

19 endpoint: str, 

20 *, 

21 max_retries: int = 5, 

22 backoff_factor: float = 0.5, 

23 timeout: float | None = None, 

24 ): 

25 """Initialize the SPARQL client. 

26 

27 Args: 

28 endpoint: The SPARQL endpoint URL. 

29 max_retries: Maximum number of retry attempts for transient errors. 

30 backoff_factor: Factor for exponential backoff (wait = factor * 2^retry). 

31 timeout: Request timeout in seconds. None means no timeout. 

32 """ 

33 self.endpoint = endpoint 

34 self.max_retries = max_retries 

35 self.backoff_factor = backoff_factor 

36 self.timeout = timeout 

37 self._curl = pycurl.Curl() 

38 

39 def __enter__(self) -> "SPARQLClient": 

40 return self 

41 

42 def __exit__(self, exc_type, exc_val, exc_tb) -> None: 

43 self.close() 

44 

45 def __del__(self) -> None: 

46 if self._curl is not None: 

47 warnings.warn( 

48 "SPARQLClient was not closed. Use 'with SPARQLClient(...) as client:' " 

49 "or call 'client.close()' explicitly.", 

50 ResourceWarning, 

51 stacklevel=2, 

52 ) 

53 self.close() 

54 

55 def close(self) -> None: 

56 """Close the client and release resources. 

57 

58 This method is idempotent - calling it multiple times is safe. 

59 """ 

60 if self._curl is not None: 

61 self._curl.close() 

62 self._curl = None 

63 

64 def _request( 

65 self, 

66 query: str, 

67 accept: str, 

68 *, 

69 is_update: bool = False, 

70 ) -> bytes: 

71 """Execute an HTTP request with retry logic.""" 

72 last_error = None 

73 

74 for attempt in range(self.max_retries + 1): 

75 if attempt > 0: 

76 wait_time = self.backoff_factor * (2**attempt) 

77 time.sleep(wait_time) 

78 

79 buffer = BytesIO() 

80 self._curl.reset() 

81 

82 if self.timeout is not None: 

83 self._curl.setopt(pycurl.TIMEOUT_MS, int(self.timeout * 1000)) 

84 

85 try: 

86 self._curl.setopt(pycurl.URL, self.endpoint) 

87 self._curl.setopt(pycurl.WRITEDATA, buffer) 

88 self._curl.setopt( 

89 pycurl.HTTPHEADER, 

90 [ 

91 f"Accept: {accept}", 

92 "User-Agent: sparqlite/0.1.0", 

93 ], 

94 ) 

95 

96 if is_update: 

97 post_data = urlencode({"update": query}) 

98 else: 

99 post_data = urlencode({"query": query}) 

100 

101 self._curl.setopt(pycurl.POSTFIELDS, post_data) 

102 self._curl.perform() 

103 

104 status_code = self._curl.getinfo(pycurl.RESPONSE_CODE) 

105 

106 if status_code == 400: 

107 raise QueryError(f"Query syntax error: {buffer.getvalue().decode()}") 

108 

109 if status_code >= 500: 

110 last_error = EndpointError( 

111 f"Server error: {status_code}", 

112 status_code=status_code, 

113 ) 

114 continue 

115 

116 if status_code >= 400: 

117 raise EndpointError( 

118 f"HTTP error: {status_code} - {buffer.getvalue().decode()}", 

119 status_code=status_code, 

120 ) 

121 

122 return buffer.getvalue() 

123 

124 except pycurl.error as e: 

125 error_code, error_msg = e.args 

126 if error_code in (pycurl.E_COULDNT_CONNECT, pycurl.E_COULDNT_RESOLVE_HOST): 

127 last_error = EndpointError(f"Connection error: {error_msg}") 

128 elif error_code == pycurl.E_OPERATION_TIMEDOUT: 

129 last_error = EndpointError(f"Timeout error: {error_msg}") 

130 else: 

131 last_error = EndpointError(f"Request error: {error_msg}") 

132 continue 

133 

134 raise last_error 

135 

136 def query(self, query: str) -> dict: 

137 """Execute a SELECT query. 

138 

139 Args: 

140 query: The SPARQL SELECT query string. 

141 

142 Returns: 

143 Dictionary with SPARQL JSON results format. 

144 """ 

145 content = self._request(query, "application/sparql-results+json") 

146 return json.loads(content) 

147 

148 def select(self, query: str) -> dict: 

149 """Execute a SELECT query. Alias for query().""" 

150 return self.query(query) 

151 

152 def ask(self, query: str) -> bool: 

153 """Execute an ASK query. 

154 

155 Args: 

156 query: The SPARQL ASK query string. 

157 

158 Returns: 

159 Boolean result of the ASK query. 

160 """ 

161 content = self._request(query, "application/sparql-results+json") 

162 return json.loads(content)["boolean"] 

163 

164 def construct(self, query: str) -> bytes: 

165 """Execute a CONSTRUCT query. 

166 

167 Args: 

168 query: The SPARQL CONSTRUCT query string. 

169 

170 Returns: 

171 Raw N-Triples bytes. 

172 """ 

173 return self._request(query, "application/n-triples") 

174 

175 def describe(self, query: str) -> bytes: 

176 """Execute a DESCRIBE query. 

177 

178 Args: 

179 query: The SPARQL DESCRIBE query string. 

180 

181 Returns: 

182 Raw N-Triples bytes. 

183 """ 

184 return self._request(query, "application/n-triples") 

185 

186 def update(self, query: str) -> None: 

187 """Execute a SPARQL UPDATE query (INSERT, DELETE, etc.). 

188 

189 Args: 

190 query: The SPARQL UPDATE query string. 

191 """ 

192 self._request(query, "application/sparql-results+json", is_update=True)