Coverage for test/test_utils.py: 58%
90 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022-2025 OpenCitations
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17import random
18import subprocess
19import time
21import redis
22from rdflib import Graph
23from sparqlite import SPARQLClient
25# Common constants
26SERVER = "http://127.0.0.1:8805/sparql"
27PROV_SERVER = "http://127.0.0.1:8806/sparql"
28VIRTUOSO_CONTAINER = "oc-meta-test-virtuoso"
29VIRTUOSO_PROV_CONTAINER = "oc-meta-test-virtuoso-prov"
31# Redis configuration
32REDIS_HOST = "localhost"
33REDIS_PORT = 6381
34REDIS_DB = 5 # For counters
35REDIS_CACHE_DB = 2 # For cache
38def reset_server() -> None:
39 """
40 Reset the SPARQL servers using Virtuoso's RDF_GLOBAL_RESET() via docker exec isql.
41 """
42 max_retries = 5
43 base_delay = 2
45 # Reset main triplestore
46 main_command = [
47 "docker", "exec", VIRTUOSO_CONTAINER,
48 "/opt/virtuoso-opensource/bin/isql", "1111", "dba", "dba",
49 "exec=RDF_GLOBAL_RESET();"
50 ]
52 # Reset provenance triplestore
53 prov_command = [
54 "docker", "exec", VIRTUOSO_PROV_CONTAINER,
55 "/opt/virtuoso-opensource/bin/isql", "1111", "dba", "dba",
56 "exec=RDF_GLOBAL_RESET();"
57 ]
59 # Reset main triplestore
60 for attempt in range(max_retries):
61 try:
62 # Add small random delay to avoid race conditions
63 time.sleep(base_delay + random.uniform(0, 1))
65 result = subprocess.run(
66 main_command,
67 capture_output=True, # Use capture_output instead of stdout/stderr pipes
68 text=True, # Decode output as text
69 check=True, # Raise CalledProcessError on non-zero exit code
70 timeout=20, # Increased timeout slightly
71 )
72 # If successful, break the loop
73 break
75 except subprocess.CalledProcessError as e:
76 print(f"Error resetting main triplestore (attempt {attempt+1}/{max_retries}): {e}")
77 if attempt == max_retries - 1:
78 raise
79 # Exponential backoff with jitter
80 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1))
82 except subprocess.TimeoutExpired:
83 print(f"Timeout resetting main triplestore (attempt {attempt+1}/{max_retries})")
84 if attempt == max_retries - 1:
85 raise
86 # Exponential backoff with jitter
87 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1))
89 # Reset provenance triplestore
90 for attempt in range(max_retries):
91 try:
92 # Add small random delay to avoid race conditions
93 time.sleep(base_delay + random.uniform(0, 1))
95 result = subprocess.run(
96 prov_command,
97 capture_output=True, # Use capture_output instead of stdout/stderr pipes
98 text=True, # Decode output as text
99 check=True, # Raise CalledProcessError on non-zero exit code
100 timeout=20, # Increased timeout slightly
101 )
102 # If successful, break the loop
103 break
105 except subprocess.CalledProcessError as e:
106 print(f"Error resetting provenance triplestore (attempt {attempt+1}/{max_retries}): {e}")
107 if attempt == max_retries - 1:
108 raise
109 # Exponential backoff with jitter
110 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1))
112 except subprocess.TimeoutExpired:
113 print(f"Timeout resetting provenance triplestore (attempt {attempt+1}/{max_retries})")
114 if attempt == max_retries - 1:
115 raise
116 # Exponential backoff with jitter
117 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1))
120def reset_redis_counters():
121 """
122 Reset the Redis counters and cache databases.
123 """
124 redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
125 redis_cache_client = redis.Redis(
126 host=REDIS_HOST, port=REDIS_PORT, db=REDIS_CACHE_DB
127 )
128 redis_client.flushdb()
129 redis_cache_client.flushdb()
132def execute_sparql_query(endpoint, query, max_retries=3, delay=5):
133 """
134 Execute a SPARQL SELECT query with retry logic.
136 Args:
137 endpoint (str): SPARQL endpoint URL
138 query (str): SPARQL SELECT query to execute
139 max_retries (int): Maximum number of retry attempts
140 delay (int): Delay between retries in seconds (used as backoff_factor)
142 Returns:
143 Query results in JSON format (dict)
145 Raises:
146 URLError: If connection fails after all retries
147 """
148 try:
149 with SPARQLClient(endpoint, max_retries=max_retries, backoff_factor=delay, timeout=60) as client:
150 return client.query(query)
151 except Exception as e:
152 from urllib.error import URLError
153 raise URLError(
154 f"Failed to connect to SPARQL endpoint after {max_retries} attempts: {str(e)}"
155 )
158def execute_sparql_construct(endpoint, query, max_retries=3, delay=5):
159 try:
160 with SPARQLClient(endpoint, max_retries=max_retries, backoff_factor=delay, timeout=60) as client:
161 g = Graph()
162 g.parse(data=client.construct(query), format='nt')
163 return g
164 except Exception as e:
165 from urllib.error import URLError
166 raise URLError(
167 f"Failed to connect to SPARQL endpoint after {max_retries} attempts: {str(e)}"
168 )
171def wait_for_virtuoso(server: str, max_wait: int = 60) -> bool:
172 """Wait for Virtuoso SPARQL endpoint to be ready.
174 Args:
175 server: SPARQL endpoint URL
176 max_wait: Maximum time to wait in seconds
178 Returns:
179 True if service is ready, False if timeout
180 """
181 start_time = time.time()
182 while time.time() - start_time < max_wait:
183 try:
184 with SPARQLClient(server, max_retries=1, backoff_factor=1, timeout=60) as client:
185 client.query("SELECT * WHERE { ?s ?p ?o } LIMIT 1")
186 return True
187 except Exception:
188 time.sleep(2)
189 return False
192def wait_for_redis(host: str = REDIS_HOST, port: int = REDIS_PORT, max_wait: int = 10) -> bool:
193 """Wait for Redis to be ready.
195 Args:
196 host: Redis host
197 port: Redis port
198 max_wait: Maximum time to wait in seconds
200 Returns:
201 True if service is ready, False if timeout
202 """
203 start_time = time.time()
204 while time.time() - start_time < max_wait:
205 try:
206 client = redis.Redis(host=host, port=port)
207 client.ping()
208 return True
209 except Exception:
210 time.sleep(1)
211 return False