Coverage for test/test_utils.py: 61%

71 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2022-2025 OpenCitations 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17import random 

18import subprocess 

19import time 

20 

21import redis 

22from SPARQLWrapper import JSON, SPARQLWrapper 

23 

24# Common constants 

25SERVER = "http://127.0.0.1:8805/sparql" 

26PROV_SERVER = "http://127.0.0.1:8806/sparql" 

27VIRTUOSO_CONTAINER = "oc-meta-test-virtuoso" 

28VIRTUOSO_PROV_CONTAINER = "oc-meta-test-virtuoso-prov" 

29 

30# Redis configuration 

31REDIS_HOST = "localhost" 

32REDIS_PORT = 6379 

33REDIS_DB = 5 # For counters 

34REDIS_CACHE_DB = 2 # For cache 

35 

36 

37def reset_server() -> None: 

38 """ 

39 Reset the SPARQL servers using Virtuoso's RDF_GLOBAL_RESET() via docker exec isql. 

40 """ 

41 max_retries = 5 

42 base_delay = 2 

43 

44 # Reset main triplestore 

45 main_command = [ 

46 "docker", "exec", VIRTUOSO_CONTAINER, 

47 "/opt/virtuoso-opensource/bin/isql", "1111", "dba", "dba", 

48 "exec=RDF_GLOBAL_RESET();" 

49 ] 

50 

51 # Reset provenance triplestore 

52 prov_command = [ 

53 "docker", "exec", VIRTUOSO_PROV_CONTAINER, 

54 "/opt/virtuoso-opensource/bin/isql", "1111", "dba", "dba", 

55 "exec=RDF_GLOBAL_RESET();" 

56 ] 

57 

58 # Reset main triplestore 

59 for attempt in range(max_retries): 

60 try: 

61 # Add small random delay to avoid race conditions 

62 time.sleep(base_delay + random.uniform(0, 1)) 

63 

64 result = subprocess.run( 

65 main_command, 

66 capture_output=True, # Use capture_output instead of stdout/stderr pipes 

67 text=True, # Decode output as text 

68 check=True, # Raise CalledProcessError on non-zero exit code 

69 timeout=20, # Increased timeout slightly 

70 ) 

71 # If successful, break the loop 

72 break 

73 

74 except subprocess.CalledProcessError as e: 

75 print(f"Error resetting main triplestore (attempt {attempt+1}/{max_retries}): {e}") 

76 if attempt == max_retries - 1: 

77 raise 

78 # Exponential backoff with jitter 

79 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1)) 

80 

81 except subprocess.TimeoutExpired: 

82 print(f"Timeout resetting main triplestore (attempt {attempt+1}/{max_retries})") 

83 if attempt == max_retries - 1: 

84 raise 

85 # Exponential backoff with jitter 

86 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1)) 

87 

88 # Reset provenance triplestore 

89 for attempt in range(max_retries): 

90 try: 

91 # Add small random delay to avoid race conditions 

92 time.sleep(base_delay + random.uniform(0, 1)) 

93 

94 result = subprocess.run( 

95 prov_command, 

96 capture_output=True, # Use capture_output instead of stdout/stderr pipes 

97 text=True, # Decode output as text 

98 check=True, # Raise CalledProcessError on non-zero exit code 

99 timeout=20, # Increased timeout slightly 

100 ) 

101 # If successful, break the loop 

102 break 

103 

104 except subprocess.CalledProcessError as e: 

105 print(f"Error resetting provenance triplestore (attempt {attempt+1}/{max_retries}): {e}") 

106 if attempt == max_retries - 1: 

107 raise 

108 # Exponential backoff with jitter 

109 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1)) 

110 

111 except subprocess.TimeoutExpired: 

112 print(f"Timeout resetting provenance triplestore (attempt {attempt+1}/{max_retries})") 

113 if attempt == max_retries - 1: 

114 raise 

115 # Exponential backoff with jitter 

116 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1)) 

117 

118 

119def reset_redis_counters(): 

120 """ 

121 Reset the Redis counters and cache databases. 

122 """ 

123 redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) 

124 redis_cache_client = redis.Redis( 

125 host=REDIS_HOST, port=REDIS_PORT, db=REDIS_CACHE_DB 

126 ) 

127 redis_client.flushdb() 

128 redis_cache_client.flushdb() 

129 

130 

131def execute_sparql_query(endpoint, query, return_format=JSON, max_retries=3, delay=5): 

132 """ 

133 Execute a SPARQL query with retry logic and better error handling. 

134 

135 Args: 

136 endpoint (str): SPARQL endpoint URL 

137 query (str): SPARQL query to execute 

138 return_format: Query return format (JSON, XML etc) 

139 max_retries (int): Maximum number of retry attempts 

140 delay (int): Delay between retries in seconds 

141 

142 Returns: 

143 Query results in specified format 

144 

145 Raises: 

146 URLError: If connection fails after all retries 

147 """ 

148 sparql = SPARQLWrapper(endpoint) 

149 sparql.setQuery(query) 

150 sparql.setReturnFormat(return_format) 

151 

152 retry_count = 0 

153 last_error = None 

154 

155 while retry_count < max_retries: 

156 try: 

157 sparql.setTimeout(30) # Increase timeout 

158 return sparql.queryAndConvert() 

159 except Exception as e: 

160 last_error = e 

161 retry_count += 1 

162 if retry_count == max_retries: 

163 from urllib.error import URLError 

164 raise URLError( 

165 f"Failed to connect to SPARQL endpoint after {max_retries} attempts: {str(last_error)}" 

166 ) 

167 print( 

168 f"Connection attempt {retry_count} failed, retrying in {delay} seconds..." 

169 ) 

170 time.sleep(delay) # Increased delay between retries