Coverage for oc_ocdm / support / sparql.py: 71%

48 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-08 20:23 +0000

1# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import json 

8import time 

9from urllib.error import HTTPError, URLError 

10from urllib.parse import parse_qs, urlparse 

11 

12from SPARQLWrapper import JSON, N3, POST, URLENCODED, SPARQLWrapper 

13 

14 

15class SPARQLEndpointError(Exception): 

16 def __init__(self, message: str, status_code: int | None = None): 

17 super().__init__(message) 

18 self.status_code = status_code 

19 

20 

21def _make_sparql_client(endpoint: str) -> SPARQLWrapper: 

22 parsed = urlparse(endpoint) 

23 base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" 

24 sparql = SPARQLWrapper(base_url) 

25 for key, values in parse_qs(parsed.query).items(): 

26 sparql.addParameter(key, values[0]) 

27 return sparql 

28 

29 

30def _execute_with_retry( 

31 endpoint: str, 

32 query: str, 

33 return_format: str, 

34 *, 

35 is_update: bool = False, 

36 max_retries: int = 5, 

37 backoff_factor: float = 0.5, 

38) -> bytes: 

39 sparql = _make_sparql_client(endpoint) 

40 sparql.setQuery(query) 

41 sparql.setReturnFormat(return_format) 

42 if is_update: 

43 sparql.setMethod(POST) 

44 sparql.setRequestMethod(URLENCODED) 

45 

46 last_error: SPARQLEndpointError | None = None 

47 

48 for attempt in range(max_retries + 1): 

49 if attempt > 0: 

50 time.sleep(backoff_factor * (2 ** attempt)) 

51 try: 

52 return sparql.query().response.read() 

53 except HTTPError as e: 

54 if e.code == 400: 

55 raise SPARQLEndpointError( 

56 f"Query syntax error: {e.read().decode()}", status_code=400 

57 ) from e 

58 if e.code >= 500: 

59 last_error = SPARQLEndpointError( 

60 f"Server error: {e.code}", status_code=e.code 

61 ) 

62 continue 

63 raise SPARQLEndpointError( 

64 f"HTTP error: {e.code} - {e.read().decode()}", status_code=e.code 

65 ) from e 

66 except URLError as e: 

67 last_error = SPARQLEndpointError(f"Connection error: {e.reason}") 

68 continue 

69 

70 raise last_error # type: ignore[misc] 

71 

72 

73def sparql_query( 

74 endpoint: str, 

75 query: str, 

76 *, 

77 max_retries: int = 5, 

78 backoff_factor: float = 0.5, 

79) -> dict: 

80 raw = _execute_with_retry( 

81 endpoint, query, JSON, max_retries=max_retries, backoff_factor=backoff_factor 

82 ) 

83 return json.loads(raw) 

84 

85 

86def sparql_update( 

87 endpoint: str, 

88 query: str, 

89 *, 

90 max_retries: int = 5, 

91 backoff_factor: float = 0.5, 

92) -> None: 

93 _execute_with_retry( 

94 endpoint, query, JSON, is_update=True, max_retries=max_retries, backoff_factor=backoff_factor 

95 ) 

96 

97 

98def sparql_construct( 

99 endpoint: str, 

100 query: str, 

101 *, 

102 max_retries: int = 5, 

103 backoff_factor: float = 0.5, 

104) -> bytes: 

105 return _execute_with_retry( 

106 endpoint, query, N3, max_retries=max_retries, backoff_factor=backoff_factor 

107 )