Coverage for meta_prov_fixer / virtuoso_watchdog.py: 0%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-16 15:12 +0000

1import time 

2import threading 

3import logging 

4import docker 

5import traceback 

6from SPARQLWrapper import SPARQLWrapper, JSON 

7from sparqlite import SPARQLClient 

8 

9 

10def wait_for_sparql(endpoint: str, timeout: int = 120) -> bool: 

11 """Wait until SPARQL endpoint responds to a simple ASK query.""" 

12 sparql = SPARQLWrapper(endpoint) 

13 sparql.setQuery("ASK {}") 

14 sparql.setReturnFormat(JSON) 

15 

16 start = time.time() 

17 while time.time() - start < timeout: 

18 try: 

19 sparql.query() 

20 return True 

21 except Exception: 

22 time.sleep(2) 

23 return False 

24 

25 

26def sparql_healthcheck(endpoint: str, timeout: int = 5) -> bool: 

27 """Run a lightweight SPARQL SELECT healthcheck.""" 

28 q = """ 

29 SELECT ?s ?p ?o 

30 WHERE { ?s ?p ?o } 

31 LIMIT 1 

32 """ 

33 

34 try: 

35 with SPARQLClient(endpoint) as client: 

36 result = client.query(q) 

37 if len(result["results"]["bindings"]) > 0: 

38 return True 

39 else: 

40 return False 

41 except Exception: 

42 return False 

43 

44 

45def monitor_and_restart( 

46 container_name: str, 

47 endpoint: str, 

48 threshold: float = 0.98, 

49 restart_interval: int = 10800, # 3 hours 

50 mem_check_interval: int = 3600, 

51 healthcheck_interval: int = 180, 

52): 

53 """ 

54 Background watchdog thread. 

55 

56 - Monitors Docker container memory usage 

57 - Runs SPARQL healthchecks periodically 

58 - Restarts container if memory usage exceeds threshold 

59 or if SPARQL healthcheck fails consecutively 

60  

61 :param container_name: Name of the Docker container running Virtuoso 

62 :param endpoint: SPARQL endpoint URL 

63 :param threshold: Memory usage threshold (fraction of limit) to trigger restart 

64 :param restart_interval: Interval (seconds) between restarts, to force periodic restarts even if memory usage is below threshold (default 3 hours) 

65 :param mem_check_interval: Interval (seconds) between memory usage checks 

66 :param healthcheck_interval: Interval (seconds) between SPARQL healthchecks 

67 """ 

68 client = docker.from_env() 

69 GiB = 1024 ** 3 

70 

71 last_restart = 0 

72 last_mem_check = 0 

73 last_healthcheck = 0 

74 

75 while True: 

76 now = time.time() 

77 

78 try: 

79 container = client.containers.get(container_name) 

80 

81 if now - last_restart > restart_interval: 

82 logging.warning(f"[Virtuoso watchdog] Restart interval exceeded ({last_restart/3600} hours)-> restarting container") 

83 container.restart() 

84 last_restart = now 

85 

86 logging.info("[Virtuoso watchdog] Sleeping 15 minutes to allow Virtuoso to restart and stabilize before healthcheck...") 

87 time.sleep(900) 

88 

89 if sparql_healthcheck(endpoint): 

90 logging.info("[Virtuoso watchdog] SPARQL endpoint is back online") 

91 else: 

92 logging.error("[Virtuoso watchdog] SPARQL endpoint DID NOT recover within timeout!") 

93 

94 # # --- Virtuoso healthcheck --- 

95 # if now - last_healthcheck >= healthcheck_interval: 

96 # last_healthcheck = now 

97 

98 # if not sparql_healthcheck(endpoint): 

99 # logging.error( 

100 # "[Virtuoso watchdog] SPARQL healthcheck failed -> restarting container" 

101 # ) 

102 # container.restart() 

103 

104 # logging.info("[Virtuoso watchdog] Waiting for SPARQL endpoint to recover…") 

105 # if wait_for_sparql(endpoint): 

106 # logging.info("[Virtuoso watchdog] SPARQL endpoint is back online") 

107 # else: 

108 # logging.error("[Virtuoso watchdog] SPARQL endpoint DID NOT recover within timeout!") 

109 

110 

111 # --- Container memory usage check --- 

112 if now - last_mem_check >= mem_check_interval: 

113 last_mem_check = now 

114 

115 stats = container.stats(stream=False) 

116 

117 used = stats["memory_stats"]["usage"] 

118 limit = stats["memory_stats"]["limit"] 

119 cache = stats["memory_stats"]["stats"].get("inactive_file", 0) 

120 effective_used = used - cache 

121 ratio = effective_used / limit 

122 

123 logging.info( 

124 f"[Virtuoso watchdog] Mem use: " 

125 f"{effective_used/GiB:.2f}GiB / {limit/GiB:.2f}GiB " 

126 f"({ratio*100:.1f}%)" 

127 ) 

128 

129 if ratio > threshold: 

130 logging.warning( 

131 "[Virtuoso watchdog] Memory above threshold -> restarting container" 

132 ) 

133 container.restart() 

134 

135 logging.info("[Virtuoso watchdog] Waiting for SPARQL endpoint to recover…") 

136 logging.info("Sleeping 15 minutes to allow Virtuoso to restart and stabilize before healthcheck...") 

137 time.sleep(900) 

138 

139 if wait_for_sparql(endpoint): 

140 logging.info("[Virtuoso watchdog] SPARQL endpoint is back online") 

141 else: 

142 logging.error("[Virtuoso watchdog] SPARQL endpoint DID NOT recover within timeout!") 

143 

144 except Exception: 

145 logging.error("[Virtuoso watchdog] Unexpected error", exc_info=True) 

146 

147 # Small sleep to avoid busy-waiting 

148 time.sleep(5) 

149 

150 

151def start_watchdog_thread(container_name: str, endpoint: str): 

152 t = threading.Thread( 

153 target=monitor_and_restart, 

154 args=(container_name, endpoint), 

155 daemon=True, 

156 ) 

157 t.start()