Coverage for oc_ocdm / support / sparql.py: 71%
48 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-08 20:23 +0000
1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import json
8import time
9from urllib.error import HTTPError, URLError
10from urllib.parse import parse_qs, urlparse
12from SPARQLWrapper import JSON, N3, POST, URLENCODED, SPARQLWrapper
15class SPARQLEndpointError(Exception):
16 def __init__(self, message: str, status_code: int | None = None):
17 super().__init__(message)
18 self.status_code = status_code
21def _make_sparql_client(endpoint: str) -> SPARQLWrapper:
22 parsed = urlparse(endpoint)
23 base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
24 sparql = SPARQLWrapper(base_url)
25 for key, values in parse_qs(parsed.query).items():
26 sparql.addParameter(key, values[0])
27 return sparql
30def _execute_with_retry(
31 endpoint: str,
32 query: str,
33 return_format: str,
34 *,
35 is_update: bool = False,
36 max_retries: int = 5,
37 backoff_factor: float = 0.5,
38) -> bytes:
39 sparql = _make_sparql_client(endpoint)
40 sparql.setQuery(query)
41 sparql.setReturnFormat(return_format)
42 if is_update:
43 sparql.setMethod(POST)
44 sparql.setRequestMethod(URLENCODED)
46 last_error: SPARQLEndpointError | None = None
48 for attempt in range(max_retries + 1):
49 if attempt > 0:
50 time.sleep(backoff_factor * (2 ** attempt))
51 try:
52 return sparql.query().response.read()
53 except HTTPError as e:
54 if e.code == 400:
55 raise SPARQLEndpointError(
56 f"Query syntax error: {e.read().decode()}", status_code=400
57 ) from e
58 if e.code >= 500:
59 last_error = SPARQLEndpointError(
60 f"Server error: {e.code}", status_code=e.code
61 )
62 continue
63 raise SPARQLEndpointError(
64 f"HTTP error: {e.code} - {e.read().decode()}", status_code=e.code
65 ) from e
66 except URLError as e:
67 last_error = SPARQLEndpointError(f"Connection error: {e.reason}")
68 continue
70 raise last_error # type: ignore[misc]
73def sparql_query(
74 endpoint: str,
75 query: str,
76 *,
77 max_retries: int = 5,
78 backoff_factor: float = 0.5,
79) -> dict:
80 raw = _execute_with_retry(
81 endpoint, query, JSON, max_retries=max_retries, backoff_factor=backoff_factor
82 )
83 return json.loads(raw)
86def sparql_update(
87 endpoint: str,
88 query: str,
89 *,
90 max_retries: int = 5,
91 backoff_factor: float = 0.5,
92) -> None:
93 _execute_with_retry(
94 endpoint, query, JSON, is_update=True, max_retries=max_retries, backoff_factor=backoff_factor
95 )
98def sparql_construct(
99 endpoint: str,
100 query: str,
101 *,
102 max_retries: int = 5,
103 backoff_factor: float = 0.5,
104) -> bytes:
105 return _execute_with_retry(
106 endpoint, query, N3, max_retries=max_retries, backoff_factor=backoff_factor
107 )