Coverage for src / sparqlite / client.py: 100%
88 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-21 11:59 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-21 11:59 +0000
1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5"""Synchronous SPARQL client."""
7import json
8import time
9import warnings
10from io import BytesIO
11from typing import Literal
12from urllib.parse import parse_qs, urlencode, urlparse
14import pycurl
16from sparqlite.exceptions import EndpointError, QueryError
18HTTPMethod = Literal["GET", "POST"]
21class SPARQLClient:
23 def __init__(
24 self,
25 endpoint: str,
26 *,
27 max_retries: int = 5,
28 backoff_factor: float = 0.5,
29 timeout: float | None = None,
30 ):
31 self.endpoint = endpoint
32 self.max_retries = max_retries
33 self.backoff_factor = backoff_factor
34 self.timeout = timeout
35 self._curl: pycurl.Curl | None = pycurl.Curl()
37 def __enter__(self) -> "SPARQLClient":
38 return self
40 def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
41 self.close()
43 def __del__(self) -> None:
44 if self._curl is not None:
45 warnings.warn(
46 "SPARQLClient was not closed. Use 'with SPARQLClient(...) as client:' "
47 "or call 'client.close()' explicitly.",
48 ResourceWarning,
49 stacklevel=2,
50 )
51 self.close()
53 def close(self) -> None:
54 if self._curl is not None:
55 self._curl.close()
56 self._curl = None
58 def _request(
59 self,
60 query: str,
61 accept: str,
62 *,
63 method: HTTPMethod = "GET",
64 is_update: bool = False,
65 ) -> bytes:
66 if self._curl is None:
67 raise EndpointError("Client is closed")
69 curl = self._curl
70 last_error: EndpointError | None = None
72 parsed_url = urlparse(self.endpoint)
73 base_endpoint = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
74 endpoint_params = {k: v[0] for k, v in parse_qs(parsed_url.query).items()}
76 for attempt in range(self.max_retries + 1):
77 if attempt > 0:
78 wait_time = self.backoff_factor * (2**attempt)
79 time.sleep(wait_time)
81 buffer = BytesIO()
82 curl.reset()
84 if self.timeout is not None:
85 curl.setopt(pycurl.TIMEOUT_MS, int(self.timeout * 1000))
87 try:
88 param_key = "update" if is_update else "query"
89 params = {param_key: query, **endpoint_params}
91 if method == "GET":
92 url = f"{base_endpoint}?{urlencode(params)}"
93 curl.setopt(pycurl.URL, url)
94 curl.setopt(pycurl.HTTPGET, 1)
95 else:
96 curl.setopt(pycurl.URL, base_endpoint)
97 curl.setopt(pycurl.POSTFIELDS, urlencode(params))
99 curl.setopt(pycurl.WRITEDATA, buffer)
100 curl.setopt(
101 pycurl.HTTPHEADER,
102 [
103 f"Accept: {accept}",
104 "User-Agent: sparqlite/0.1.0",
105 ],
106 )
108 curl.perform()
110 status_code = curl.getinfo(pycurl.RESPONSE_CODE)
112 if status_code == 400:
113 raise QueryError(f"Query syntax error: {buffer.getvalue().decode()}")
115 if status_code >= 500:
116 last_error = EndpointError(
117 f"Server error: {status_code}",
118 status_code=status_code,
119 )
120 continue
122 if status_code >= 400:
123 raise EndpointError(
124 f"HTTP error: {status_code} - {buffer.getvalue().decode()}",
125 status_code=status_code,
126 )
128 return buffer.getvalue()
130 except pycurl.error as e:
131 error_code, error_msg = e.args
132 if error_code in (pycurl.E_COULDNT_CONNECT, pycurl.E_COULDNT_RESOLVE_HOST):
133 last_error = EndpointError(f"Connection error: {error_msg}")
134 elif error_code == pycurl.E_OPERATION_TIMEDOUT:
135 last_error = EndpointError(f"Timeout error: {error_msg}")
136 else:
137 last_error = EndpointError(f"Request error: {error_msg}")
138 continue
140 raise last_error # type: ignore[misc]
142 def query(self, query: str, *, method: HTTPMethod = "GET") -> dict:
143 content = self._request(
144 query, "application/sparql-results+json", method=method
145 )
146 return json.loads(content)
148 def select(self, query: str, *, method: HTTPMethod = "GET") -> dict:
149 return self.query(query, method=method)
151 def ask(self, query: str, *, method: HTTPMethod = "GET") -> bool:
152 content = self._request(
153 query, "application/sparql-results+json", method=method
154 )
155 return json.loads(content)["boolean"]
157 def construct(self, query: str, *, method: HTTPMethod = "GET") -> bytes:
158 return self._request(query, "application/n-triples", method=method)
160 def describe(self, query: str, *, method: HTTPMethod = "GET") -> bytes:
161 return self._request(query, "application/n-triples", method=method)
163 def update(self, query: str) -> None:
164 self._request(
165 query, "application/sparql-results+json", method="POST", is_update=True
166 )