Coverage for test/test_utils.py: 61%
71 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022-2025 OpenCitations
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17import random
18import subprocess
19import time
21import redis
22from SPARQLWrapper import JSON, SPARQLWrapper
24# Common constants
25SERVER = "http://127.0.0.1:8805/sparql"
26PROV_SERVER = "http://127.0.0.1:8806/sparql"
27VIRTUOSO_CONTAINER = "oc-meta-test-virtuoso"
28VIRTUOSO_PROV_CONTAINER = "oc-meta-test-virtuoso-prov"
30# Redis configuration
31REDIS_HOST = "localhost"
32REDIS_PORT = 6379
33REDIS_DB = 5 # For counters
34REDIS_CACHE_DB = 2 # For cache
37def reset_server() -> None:
38 """
39 Reset the SPARQL servers using Virtuoso's RDF_GLOBAL_RESET() via docker exec isql.
40 """
41 max_retries = 5
42 base_delay = 2
44 # Reset main triplestore
45 main_command = [
46 "docker", "exec", VIRTUOSO_CONTAINER,
47 "/opt/virtuoso-opensource/bin/isql", "1111", "dba", "dba",
48 "exec=RDF_GLOBAL_RESET();"
49 ]
51 # Reset provenance triplestore
52 prov_command = [
53 "docker", "exec", VIRTUOSO_PROV_CONTAINER,
54 "/opt/virtuoso-opensource/bin/isql", "1111", "dba", "dba",
55 "exec=RDF_GLOBAL_RESET();"
56 ]
58 # Reset main triplestore
59 for attempt in range(max_retries):
60 try:
61 # Add small random delay to avoid race conditions
62 time.sleep(base_delay + random.uniform(0, 1))
64 result = subprocess.run(
65 main_command,
66 capture_output=True, # Use capture_output instead of stdout/stderr pipes
67 text=True, # Decode output as text
68 check=True, # Raise CalledProcessError on non-zero exit code
69 timeout=20, # Increased timeout slightly
70 )
71 # If successful, break the loop
72 break
74 except subprocess.CalledProcessError as e:
75 print(f"Error resetting main triplestore (attempt {attempt+1}/{max_retries}): {e}")
76 if attempt == max_retries - 1:
77 raise
78 # Exponential backoff with jitter
79 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1))
81 except subprocess.TimeoutExpired:
82 print(f"Timeout resetting main triplestore (attempt {attempt+1}/{max_retries})")
83 if attempt == max_retries - 1:
84 raise
85 # Exponential backoff with jitter
86 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1))
88 # Reset provenance triplestore
89 for attempt in range(max_retries):
90 try:
91 # Add small random delay to avoid race conditions
92 time.sleep(base_delay + random.uniform(0, 1))
94 result = subprocess.run(
95 prov_command,
96 capture_output=True, # Use capture_output instead of stdout/stderr pipes
97 text=True, # Decode output as text
98 check=True, # Raise CalledProcessError on non-zero exit code
99 timeout=20, # Increased timeout slightly
100 )
101 # If successful, break the loop
102 break
104 except subprocess.CalledProcessError as e:
105 print(f"Error resetting provenance triplestore (attempt {attempt+1}/{max_retries}): {e}")
106 if attempt == max_retries - 1:
107 raise
108 # Exponential backoff with jitter
109 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1))
111 except subprocess.TimeoutExpired:
112 print(f"Timeout resetting provenance triplestore (attempt {attempt+1}/{max_retries})")
113 if attempt == max_retries - 1:
114 raise
115 # Exponential backoff with jitter
116 time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1))
119def reset_redis_counters():
120 """
121 Reset the Redis counters and cache databases.
122 """
123 redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
124 redis_cache_client = redis.Redis(
125 host=REDIS_HOST, port=REDIS_PORT, db=REDIS_CACHE_DB
126 )
127 redis_client.flushdb()
128 redis_cache_client.flushdb()
131def execute_sparql_query(endpoint, query, return_format=JSON, max_retries=3, delay=5):
132 """
133 Execute a SPARQL query with retry logic and better error handling.
135 Args:
136 endpoint (str): SPARQL endpoint URL
137 query (str): SPARQL query to execute
138 return_format: Query return format (JSON, XML etc)
139 max_retries (int): Maximum number of retry attempts
140 delay (int): Delay between retries in seconds
142 Returns:
143 Query results in specified format
145 Raises:
146 URLError: If connection fails after all retries
147 """
148 sparql = SPARQLWrapper(endpoint)
149 sparql.setQuery(query)
150 sparql.setReturnFormat(return_format)
152 retry_count = 0
153 last_error = None
155 while retry_count < max_retries:
156 try:
157 sparql.setTimeout(30) # Increase timeout
158 return sparql.queryAndConvert()
159 except Exception as e:
160 last_error = e
161 retry_count += 1
162 if retry_count == max_retries:
163 from urllib.error import URLError
164 raise URLError(
165 f"Failed to connect to SPARQL endpoint after {max_retries} attempts: {str(last_error)}"
166 )
167 print(
168 f"Connection attempt {retry_count} failed, retrying in {delay} seconds..."
169 )
170 time.sleep(delay) # Increased delay between retries