Coverage for test/test_utils.py: 58%

90 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-20 08:55 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2022-2025 OpenCitations 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17import random 

18import subprocess 

19import time 

20 

21import redis 

22from rdflib import Graph 

23from sparqlite import SPARQLClient 

24 

25# Common constants 

26SERVER = "http://127.0.0.1:8805/sparql" 

27PROV_SERVER = "http://127.0.0.1:8806/sparql" 

28VIRTUOSO_CONTAINER = "oc-meta-test-virtuoso" 

29VIRTUOSO_PROV_CONTAINER = "oc-meta-test-virtuoso-prov" 

30 

31# Redis configuration 

32REDIS_HOST = "localhost" 

33REDIS_PORT = 6381 

34REDIS_DB = 5 # For counters 

35REDIS_CACHE_DB = 2 # For cache 

36 

37 

38def reset_server() -> None: 

39 """ 

40 Reset the SPARQL servers using Virtuoso's RDF_GLOBAL_RESET() via docker exec isql. 

41 """ 

42 max_retries = 5 

43 base_delay = 2 

44 

45 # Reset main triplestore 

46 main_command = [ 

47 "docker", "exec", VIRTUOSO_CONTAINER, 

48 "/opt/virtuoso-opensource/bin/isql", "1111", "dba", "dba", 

49 "exec=RDF_GLOBAL_RESET();" 

50 ] 

51 

52 # Reset provenance triplestore 

53 prov_command = [ 

54 "docker", "exec", VIRTUOSO_PROV_CONTAINER, 

55 "/opt/virtuoso-opensource/bin/isql", "1111", "dba", "dba", 

56 "exec=RDF_GLOBAL_RESET();" 

57 ] 

58 

59 # Reset main triplestore 

60 for attempt in range(max_retries): 

61 try: 

62 # Add small random delay to avoid race conditions 

63 time.sleep(base_delay + random.uniform(0, 1)) 

64 

65 result = subprocess.run( 

66 main_command, 

67 capture_output=True, # Use capture_output instead of stdout/stderr pipes 

68 text=True, # Decode output as text 

69 check=True, # Raise CalledProcessError on non-zero exit code 

70 timeout=20, # Increased timeout slightly 

71 ) 

72 # If successful, break the loop 

73 break 

74 

75 except subprocess.CalledProcessError as e: 

76 print(f"Error resetting main triplestore (attempt {attempt+1}/{max_retries}): {e}") 

77 if attempt == max_retries - 1: 

78 raise 

79 # Exponential backoff with jitter 

80 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1)) 

81 

82 except subprocess.TimeoutExpired: 

83 print(f"Timeout resetting main triplestore (attempt {attempt+1}/{max_retries})") 

84 if attempt == max_retries - 1: 

85 raise 

86 # Exponential backoff with jitter 

87 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1)) 

88 

89 # Reset provenance triplestore 

90 for attempt in range(max_retries): 

91 try: 

92 # Add small random delay to avoid race conditions 

93 time.sleep(base_delay + random.uniform(0, 1)) 

94 

95 result = subprocess.run( 

96 prov_command, 

97 capture_output=True, # Use capture_output instead of stdout/stderr pipes 

98 text=True, # Decode output as text 

99 check=True, # Raise CalledProcessError on non-zero exit code 

100 timeout=20, # Increased timeout slightly 

101 ) 

102 # If successful, break the loop 

103 break 

104 

105 except subprocess.CalledProcessError as e: 

106 print(f"Error resetting provenance triplestore (attempt {attempt+1}/{max_retries}): {e}") 

107 if attempt == max_retries - 1: 

108 raise 

109 # Exponential backoff with jitter 

110 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1)) 

111 

112 except subprocess.TimeoutExpired: 

113 print(f"Timeout resetting provenance triplestore (attempt {attempt+1}/{max_retries})") 

114 if attempt == max_retries - 1: 

115 raise 

116 # Exponential backoff with jitter 

117 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1)) 

118 

119 

120def reset_redis_counters(): 

121 """ 

122 Reset the Redis counters and cache databases. 

123 """ 

124 redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) 

125 redis_cache_client = redis.Redis( 

126 host=REDIS_HOST, port=REDIS_PORT, db=REDIS_CACHE_DB 

127 ) 

128 redis_client.flushdb() 

129 redis_cache_client.flushdb() 

130 

131 

132def execute_sparql_query(endpoint, query, max_retries=3, delay=5): 

133 """ 

134 Execute a SPARQL SELECT query with retry logic. 

135 

136 Args: 

137 endpoint (str): SPARQL endpoint URL 

138 query (str): SPARQL SELECT query to execute 

139 max_retries (int): Maximum number of retry attempts 

140 delay (int): Delay between retries in seconds (used as backoff_factor) 

141 

142 Returns: 

143 Query results in JSON format (dict) 

144 

145 Raises: 

146 URLError: If connection fails after all retries 

147 """ 

148 try: 

149 with SPARQLClient(endpoint, max_retries=max_retries, backoff_factor=delay, timeout=60) as client: 

150 return client.query(query) 

151 except Exception as e: 

152 from urllib.error import URLError 

153 raise URLError( 

154 f"Failed to connect to SPARQL endpoint after {max_retries} attempts: {str(e)}" 

155 ) 

156 

157 

158def execute_sparql_construct(endpoint, query, max_retries=3, delay=5): 

159 try: 

160 with SPARQLClient(endpoint, max_retries=max_retries, backoff_factor=delay, timeout=60) as client: 

161 g = Graph() 

162 g.parse(data=client.construct(query), format='nt') 

163 return g 

164 except Exception as e: 

165 from urllib.error import URLError 

166 raise URLError( 

167 f"Failed to connect to SPARQL endpoint after {max_retries} attempts: {str(e)}" 

168 ) 

169 

170 

171def wait_for_virtuoso(server: str, max_wait: int = 60) -> bool: 

172 """Wait for Virtuoso SPARQL endpoint to be ready. 

173 

174 Args: 

175 server: SPARQL endpoint URL 

176 max_wait: Maximum time to wait in seconds 

177 

178 Returns: 

179 True if service is ready, False if timeout 

180 """ 

181 start_time = time.time() 

182 while time.time() - start_time < max_wait: 

183 try: 

184 with SPARQLClient(server, max_retries=1, backoff_factor=1, timeout=60) as client: 

185 client.query("SELECT * WHERE { ?s ?p ?o } LIMIT 1") 

186 return True 

187 except Exception: 

188 time.sleep(2) 

189 return False 

190 

191 

192def wait_for_redis(host: str = REDIS_HOST, port: int = REDIS_PORT, max_wait: int = 10) -> bool: 

193 """Wait for Redis to be ready. 

194 

195 Args: 

196 host: Redis host 

197 port: Redis port 

198 max_wait: Maximum time to wait in seconds 

199 

200 Returns: 

201 True if service is ready, False if timeout 

202 """ 

203 start_time = time.time() 

204 while time.time() - start_time < max_wait: 

205 try: 

206 client = redis.Redis(host=host, port=port) 

207 client.ping() 

208 return True 

209 except Exception: 

210 time.sleep(1) 

211 return False