Coverage for meta_prov_fixer / virtuoso_watchdog.py: 0%
72 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-16 15:12 +0000
1import time
2import threading
3import logging
4import docker
5import traceback
6from SPARQLWrapper import SPARQLWrapper, JSON
7from sparqlite import SPARQLClient
10def wait_for_sparql(endpoint: str, timeout: int = 120) -> bool:
11 """Wait until SPARQL endpoint responds to a simple ASK query."""
12 sparql = SPARQLWrapper(endpoint)
13 sparql.setQuery("ASK {}")
14 sparql.setReturnFormat(JSON)
16 start = time.time()
17 while time.time() - start < timeout:
18 try:
19 sparql.query()
20 return True
21 except Exception:
22 time.sleep(2)
23 return False
26def sparql_healthcheck(endpoint: str, timeout: int = 5) -> bool:
27 """Run a lightweight SPARQL SELECT healthcheck."""
28 q = """
29 SELECT ?s ?p ?o
30 WHERE { ?s ?p ?o }
31 LIMIT 1
32 """
34 try:
35 with SPARQLClient(endpoint) as client:
36 result = client.query(q)
37 if len(result["results"]["bindings"]) > 0:
38 return True
39 else:
40 return False
41 except Exception:
42 return False
45def monitor_and_restart(
46 container_name: str,
47 endpoint: str,
48 threshold: float = 0.98,
49 restart_interval: int = 10800, # 3 hours
50 mem_check_interval: int = 3600,
51 healthcheck_interval: int = 180,
52):
53 """
54 Background watchdog thread.
56 - Monitors Docker container memory usage
57 - Runs SPARQL healthchecks periodically
58 - Restarts container if memory usage exceeds threshold
59 or if SPARQL healthcheck fails consecutively
61 :param container_name: Name of the Docker container running Virtuoso
62 :param endpoint: SPARQL endpoint URL
63 :param threshold: Memory usage threshold (fraction of limit) to trigger restart
64 :param restart_interval: Interval (seconds) between restarts, to force periodic restarts even if memory usage is below threshold (default 3 hours)
65 :param mem_check_interval: Interval (seconds) between memory usage checks
66 :param healthcheck_interval: Interval (seconds) between SPARQL healthchecks
67 """
68 client = docker.from_env()
69 GiB = 1024 ** 3
71 last_restart = 0
72 last_mem_check = 0
73 last_healthcheck = 0
75 while True:
76 now = time.time()
78 try:
79 container = client.containers.get(container_name)
81 if now - last_restart > restart_interval:
82 logging.warning(f"[Virtuoso watchdog] Restart interval exceeded ({last_restart/3600} hours)-> restarting container")
83 container.restart()
84 last_restart = now
86 logging.info("[Virtuoso watchdog] Sleeping 15 minutes to allow Virtuoso to restart and stabilize before healthcheck...")
87 time.sleep(900)
89 if sparql_healthcheck(endpoint):
90 logging.info("[Virtuoso watchdog] SPARQL endpoint is back online")
91 else:
92 logging.error("[Virtuoso watchdog] SPARQL endpoint DID NOT recover within timeout!")
94 # # --- Virtuoso healthcheck ---
95 # if now - last_healthcheck >= healthcheck_interval:
96 # last_healthcheck = now
98 # if not sparql_healthcheck(endpoint):
99 # logging.error(
100 # "[Virtuoso watchdog] SPARQL healthcheck failed -> restarting container"
101 # )
102 # container.restart()
104 # logging.info("[Virtuoso watchdog] Waiting for SPARQL endpoint to recover…")
105 # if wait_for_sparql(endpoint):
106 # logging.info("[Virtuoso watchdog] SPARQL endpoint is back online")
107 # else:
108 # logging.error("[Virtuoso watchdog] SPARQL endpoint DID NOT recover within timeout!")
111 # --- Container memory usage check ---
112 if now - last_mem_check >= mem_check_interval:
113 last_mem_check = now
115 stats = container.stats(stream=False)
117 used = stats["memory_stats"]["usage"]
118 limit = stats["memory_stats"]["limit"]
119 cache = stats["memory_stats"]["stats"].get("inactive_file", 0)
120 effective_used = used - cache
121 ratio = effective_used / limit
123 logging.info(
124 f"[Virtuoso watchdog] Mem use: "
125 f"{effective_used/GiB:.2f}GiB / {limit/GiB:.2f}GiB "
126 f"({ratio*100:.1f}%)"
127 )
129 if ratio > threshold:
130 logging.warning(
131 "[Virtuoso watchdog] Memory above threshold -> restarting container"
132 )
133 container.restart()
135 logging.info("[Virtuoso watchdog] Waiting for SPARQL endpoint to recover…")
136 logging.info("Sleeping 15 minutes to allow Virtuoso to restart and stabilize before healthcheck...")
137 time.sleep(900)
139 if wait_for_sparql(endpoint):
140 logging.info("[Virtuoso watchdog] SPARQL endpoint is back online")
141 else:
142 logging.error("[Virtuoso watchdog] SPARQL endpoint DID NOT recover within timeout!")
144 except Exception:
145 logging.error("[Virtuoso watchdog] Unexpected error", exc_info=True)
147 # Small sleep to avoid busy-waiting
148 time.sleep(5)
151def start_watchdog_thread(container_name: str, endpoint: str):
152 t = threading.Thread(
153 target=monitor_and_restart,
154 args=(container_name, endpoint),
155 daemon=True,
156 )
157 t.start()