Coverage for src / sparqlite / client.py: 100%
77 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-20 08:07 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-20 08:07 +0000
1"""Synchronous SPARQL client."""
3import json
4import time
5import warnings
6from io import BytesIO
7from urllib.parse import urlencode
9import pycurl
11from sparqlite.exceptions import EndpointError, QueryError
14class SPARQLClient:
15 """Synchronous SPARQL 1.1 client with connection pooling and automatic retry."""
17 def __init__(
18 self,
19 endpoint: str,
20 *,
21 max_retries: int = 5,
22 backoff_factor: float = 0.5,
23 timeout: float | None = None,
24 ):
25 """Initialize the SPARQL client.
27 Args:
28 endpoint: The SPARQL endpoint URL.
29 max_retries: Maximum number of retry attempts for transient errors.
30 backoff_factor: Factor for exponential backoff (wait = factor * 2^retry).
31 timeout: Request timeout in seconds. None means no timeout.
32 """
33 self.endpoint = endpoint
34 self.max_retries = max_retries
35 self.backoff_factor = backoff_factor
36 self.timeout = timeout
37 self._curl = pycurl.Curl()
39 def __enter__(self) -> "SPARQLClient":
40 return self
42 def __exit__(self, exc_type, exc_val, exc_tb) -> None:
43 self.close()
45 def __del__(self) -> None:
46 if self._curl is not None:
47 warnings.warn(
48 "SPARQLClient was not closed. Use 'with SPARQLClient(...) as client:' "
49 "or call 'client.close()' explicitly.",
50 ResourceWarning,
51 stacklevel=2,
52 )
53 self.close()
55 def close(self) -> None:
56 """Close the client and release resources.
58 This method is idempotent - calling it multiple times is safe.
59 """
60 if self._curl is not None:
61 self._curl.close()
62 self._curl = None
64 def _request(
65 self,
66 query: str,
67 accept: str,
68 *,
69 is_update: bool = False,
70 ) -> bytes:
71 """Execute an HTTP request with retry logic."""
72 last_error = None
74 for attempt in range(self.max_retries + 1):
75 if attempt > 0:
76 wait_time = self.backoff_factor * (2**attempt)
77 time.sleep(wait_time)
79 buffer = BytesIO()
80 self._curl.reset()
82 if self.timeout is not None:
83 self._curl.setopt(pycurl.TIMEOUT_MS, int(self.timeout * 1000))
85 try:
86 self._curl.setopt(pycurl.URL, self.endpoint)
87 self._curl.setopt(pycurl.WRITEDATA, buffer)
88 self._curl.setopt(
89 pycurl.HTTPHEADER,
90 [
91 f"Accept: {accept}",
92 "User-Agent: sparqlite/0.1.0",
93 ],
94 )
96 if is_update:
97 post_data = urlencode({"update": query})
98 else:
99 post_data = urlencode({"query": query})
101 self._curl.setopt(pycurl.POSTFIELDS, post_data)
102 self._curl.perform()
104 status_code = self._curl.getinfo(pycurl.RESPONSE_CODE)
106 if status_code == 400:
107 raise QueryError(f"Query syntax error: {buffer.getvalue().decode()}")
109 if status_code >= 500:
110 last_error = EndpointError(
111 f"Server error: {status_code}",
112 status_code=status_code,
113 )
114 continue
116 if status_code >= 400:
117 raise EndpointError(
118 f"HTTP error: {status_code} - {buffer.getvalue().decode()}",
119 status_code=status_code,
120 )
122 return buffer.getvalue()
124 except pycurl.error as e:
125 error_code, error_msg = e.args
126 if error_code in (pycurl.E_COULDNT_CONNECT, pycurl.E_COULDNT_RESOLVE_HOST):
127 last_error = EndpointError(f"Connection error: {error_msg}")
128 elif error_code == pycurl.E_OPERATION_TIMEDOUT:
129 last_error = EndpointError(f"Timeout error: {error_msg}")
130 else:
131 last_error = EndpointError(f"Request error: {error_msg}")
132 continue
134 raise last_error
136 def query(self, query: str) -> dict:
137 """Execute a SELECT query.
139 Args:
140 query: The SPARQL SELECT query string.
142 Returns:
143 Dictionary with SPARQL JSON results format.
144 """
145 content = self._request(query, "application/sparql-results+json")
146 return json.loads(content)
148 def select(self, query: str) -> dict:
149 """Execute a SELECT query. Alias for query()."""
150 return self.query(query)
152 def ask(self, query: str) -> bool:
153 """Execute an ASK query.
155 Args:
156 query: The SPARQL ASK query string.
158 Returns:
159 Boolean result of the ASK query.
160 """
161 content = self._request(query, "application/sparql-results+json")
162 return json.loads(content)["boolean"]
164 def construct(self, query: str) -> bytes:
165 """Execute a CONSTRUCT query.
167 Args:
168 query: The SPARQL CONSTRUCT query string.
170 Returns:
171 Raw N-Triples bytes.
172 """
173 return self._request(query, "application/n-triples")
175 def describe(self, query: str) -> bytes:
176 """Execute a DESCRIBE query.
178 Args:
179 query: The SPARQL DESCRIBE query string.
181 Returns:
182 Raw N-Triples bytes.
183 """
184 return self._request(query, "application/n-triples")
186 def update(self, query: str) -> None:
187 """Execute a SPARQL UPDATE query (INSERT, DELETE, etc.).
189 Args:
190 query: The SPARQL UPDATE query string.
191 """
192 self._request(query, "application/sparql-results+json", is_update=True)