Coverage for src / sparqlite / client.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-21 11:59 +0000

1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5"""Synchronous SPARQL client.""" 

6 

7import json 

8import time 

9import warnings 

10from io import BytesIO 

11from typing import Literal 

12from urllib.parse import parse_qs, urlencode, urlparse 

13 

14import pycurl 

15 

16from sparqlite.exceptions import EndpointError, QueryError 

17 

18HTTPMethod = Literal["GET", "POST"] 

19 

20 

21class SPARQLClient: 

22 

23 def __init__( 

24 self, 

25 endpoint: str, 

26 *, 

27 max_retries: int = 5, 

28 backoff_factor: float = 0.5, 

29 timeout: float | None = None, 

30 ): 

31 self.endpoint = endpoint 

32 self.max_retries = max_retries 

33 self.backoff_factor = backoff_factor 

34 self.timeout = timeout 

35 self._curl: pycurl.Curl | None = pycurl.Curl() 

36 

37 def __enter__(self) -> "SPARQLClient": 

38 return self 

39 

40 def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None: 

41 self.close() 

42 

43 def __del__(self) -> None: 

44 if self._curl is not None: 

45 warnings.warn( 

46 "SPARQLClient was not closed. Use 'with SPARQLClient(...) as client:' " 

47 "or call 'client.close()' explicitly.", 

48 ResourceWarning, 

49 stacklevel=2, 

50 ) 

51 self.close() 

52 

53 def close(self) -> None: 

54 if self._curl is not None: 

55 self._curl.close() 

56 self._curl = None 

57 

58 def _request( 

59 self, 

60 query: str, 

61 accept: str, 

62 *, 

63 method: HTTPMethod = "GET", 

64 is_update: bool = False, 

65 ) -> bytes: 

66 if self._curl is None: 

67 raise EndpointError("Client is closed") 

68 

69 curl = self._curl 

70 last_error: EndpointError | None = None 

71 

72 parsed_url = urlparse(self.endpoint) 

73 base_endpoint = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}" 

74 endpoint_params = {k: v[0] for k, v in parse_qs(parsed_url.query).items()} 

75 

76 for attempt in range(self.max_retries + 1): 

77 if attempt > 0: 

78 wait_time = self.backoff_factor * (2**attempt) 

79 time.sleep(wait_time) 

80 

81 buffer = BytesIO() 

82 curl.reset() 

83 

84 if self.timeout is not None: 

85 curl.setopt(pycurl.TIMEOUT_MS, int(self.timeout * 1000)) 

86 

87 try: 

88 param_key = "update" if is_update else "query" 

89 params = {param_key: query, **endpoint_params} 

90 

91 if method == "GET": 

92 url = f"{base_endpoint}?{urlencode(params)}" 

93 curl.setopt(pycurl.URL, url) 

94 curl.setopt(pycurl.HTTPGET, 1) 

95 else: 

96 curl.setopt(pycurl.URL, base_endpoint) 

97 curl.setopt(pycurl.POSTFIELDS, urlencode(params)) 

98 

99 curl.setopt(pycurl.WRITEDATA, buffer) 

100 curl.setopt( 

101 pycurl.HTTPHEADER, 

102 [ 

103 f"Accept: {accept}", 

104 "User-Agent: sparqlite/0.1.0", 

105 ], 

106 ) 

107 

108 curl.perform() 

109 

110 status_code = curl.getinfo(pycurl.RESPONSE_CODE) 

111 

112 if status_code == 400: 

113 raise QueryError(f"Query syntax error: {buffer.getvalue().decode()}") 

114 

115 if status_code >= 500: 

116 last_error = EndpointError( 

117 f"Server error: {status_code}", 

118 status_code=status_code, 

119 ) 

120 continue 

121 

122 if status_code >= 400: 

123 raise EndpointError( 

124 f"HTTP error: {status_code} - {buffer.getvalue().decode()}", 

125 status_code=status_code, 

126 ) 

127 

128 return buffer.getvalue() 

129 

130 except pycurl.error as e: 

131 error_code, error_msg = e.args 

132 if error_code in (pycurl.E_COULDNT_CONNECT, pycurl.E_COULDNT_RESOLVE_HOST): 

133 last_error = EndpointError(f"Connection error: {error_msg}") 

134 elif error_code == pycurl.E_OPERATION_TIMEDOUT: 

135 last_error = EndpointError(f"Timeout error: {error_msg}") 

136 else: 

137 last_error = EndpointError(f"Request error: {error_msg}") 

138 continue 

139 

140 raise last_error # type: ignore[misc] 

141 

142 def query(self, query: str, *, method: HTTPMethod = "GET") -> dict: 

143 content = self._request( 

144 query, "application/sparql-results+json", method=method 

145 ) 

146 return json.loads(content) 

147 

148 def select(self, query: str, *, method: HTTPMethod = "GET") -> dict: 

149 return self.query(query, method=method) 

150 

151 def ask(self, query: str, *, method: HTTPMethod = "GET") -> bool: 

152 content = self._request( 

153 query, "application/sparql-results+json", method=method 

154 ) 

155 return json.loads(content)["boolean"] 

156 

157 def construct(self, query: str, *, method: HTTPMethod = "GET") -> bytes: 

158 return self._request(query, "application/n-triples", method=method) 

159 

160 def describe(self, query: str, *, method: HTTPMethod = "GET") -> bytes: 

161 return self._request(query, "application/n-triples", method=method) 

162 

163 def update(self, query: str) -> None: 

164 self._request( 

165 query, "application/sparql-results+json", method="POST", is_update=True 

166 )