Coverage for test/on_triplestore_test.py: 96%

166 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1import json 

2import os 

3import shutil 

4import unittest 

5from unittest.mock import patch 

6import time 

7 

8import redis 

9from oc_meta.run.upload.cache_manager import CacheManager 

10from oc_meta.run.upload.on_triplestore import ( 

11 execute_sparql_update, 

12 generate_sparql_queries, 

13 save_failed_query_file, 

14 upload_sparql_updates, 

15) 

16from oc_meta.run.upload.triplestore_connection import TriplestoreConnection 

17from SPARQLWrapper import POST, SPARQLWrapper 

18 

19SERVER = "http://127.0.0.1:8805/sparql" 

20TEST_REDIS_DB = 2 

21 

22 

23def reset_triplestore(): 

24 """Reset the test triplestore""" 

25 sparql = SPARQLWrapper(SERVER) 

26 sparql.setMethod(POST) 

27 sparql.setQuery("DELETE WHERE { GRAPH ?g { ?s ?p ?o } }") 

28 sparql.query() 

29 

30 

31def reset_redis(): 

32 """Reset the test Redis database""" 

33 try: 

34 r = redis.Redis(db=TEST_REDIS_DB, decode_responses=True) 

35 r.flushdb() 

36 except redis.ConnectionError: 

37 print("Warning: Redis not available for tests") 

38 

39 

40def save_failed_query_file(filename, failed_file): 

41 with open(failed_file, "a", encoding="utf8") as f: 

42 f.write(f"{filename}\n") 

43 

44 

45class TestCacheManager(unittest.TestCase): 

46 def setUp(self): 

47 """Setup per ogni test""" 

48 self.temp_dir = os.path.join("test", "temp_cache_test") 

49 if os.path.exists(self.temp_dir): 

50 shutil.rmtree(self.temp_dir) 

51 os.makedirs(self.temp_dir) 

52 

53 self.cache_file = os.path.join(self.temp_dir, "test_cache.json") 

54 reset_redis() 

55 

56 def tearDown(self): 

57 """Cleanup dopo ogni test""" 

58 if os.path.exists(self.temp_dir): 

59 shutil.rmtree(self.temp_dir) 

60 reset_redis() 

61 

62 def test_cache_initialization(self): 

63 """Test dell'inizializzazione del cache""" 

64 # Crea un file JSON di cache esistente 

65 initial_files = ["file1.sparql", "file2.sparql"] 

66 with open(self.cache_file, "w") as f: 

67 json.dump(initial_files, f) 

68 

69 # Inizializza CacheManager con il DB di test 

70 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB) 

71 

72 # Verifica che i file siano stati caricati 

73 self.assertEqual(cache_manager.get_all(), set(initial_files)) 

74 

75 # Verifica che i file siano stati sincronizzati su Redis 

76 if cache_manager._redis: 

77 redis_files = cache_manager._redis.smembers(CacheManager.REDIS_KEY) 

78 self.assertEqual(redis_files, set(initial_files)) 

79 

80 def test_add_and_contains(self): 

81 """Test dell'aggiunta di file e verifica della presenza""" 

82 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB) 

83 

84 # Aggiungi un file 

85 test_file = "test.sparql" 

86 cache_manager.add(test_file) 

87 

88 # Verifica presenza nel cache locale 

89 self.assertIn(test_file, cache_manager.processed_files) 

90 

91 # Verifica presenza in Redis 

92 if cache_manager._redis: 

93 self.assertTrue( 

94 cache_manager._redis.sismember(CacheManager.REDIS_KEY, test_file) 

95 ) 

96 

97 # Verifica operatore in 

98 self.assertIn(test_file, cache_manager) 

99 

100 def test_persistence(self): 

101 """Test della persistenza dei dati""" 

102 # Crea e popola un cache manager 

103 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB) 

104 test_files = ["test1.sparql", "test2.sparql"] 

105 for file in test_files: 

106 cache_manager.add(file) 

107 

108 # Forza il salvataggio 

109 cache_manager._cleanup() 

110 

111 # Verifica il contenuto del file JSON 

112 with open(self.cache_file, "r") as f: 

113 saved_files = set(json.load(f)) 

114 self.assertEqual(saved_files, set(test_files)) 

115 

116 # Crea un nuovo cache manager e verifica che carichi i dati correttamente 

117 new_cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB) 

118 self.assertEqual(new_cache_manager.get_all(), set(test_files)) 

119 

120 @patch("redis.Redis") 

121 def test_redis_fallback(self, mock_redis): 

122 """Test del fallback su JSON quando Redis non è disponibile""" 

123 # Simula Redis non disponibile 

124 mock_redis.side_effect = redis.ConnectionError() 

125 

126 # Crea cache manager 

127 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB) 

128 

129 # Verifica che stia usando solo JSON 

130 self.assertIsNone(cache_manager._redis) 

131 

132 # Verifica che le operazioni funzionino comunque 

133 test_file = "test.sparql" 

134 cache_manager.add(test_file) 

135 self.assertIn(test_file, cache_manager) 

136 

137 

138class TestOnTriplestore(unittest.TestCase): 

139 def setUp(self): 

140 """Setup per ogni test""" 

141 self.temp_dir = os.path.join("test", "temp_triplestore_test") 

142 if os.path.exists(self.temp_dir): 

143 shutil.rmtree(self.temp_dir) 

144 os.makedirs(self.temp_dir) 

145 

146 self.cache_file = os.path.join(self.temp_dir, "ts_upload_cache.json") 

147 self.failed_file = os.path.join(self.temp_dir, "failed_queries.txt") 

148 self.stop_file = os.path.join(self.temp_dir, ".stop_upload") 

149 

150 try: 

151 reset_triplestore() 

152 reset_redis() 

153 except Exception as e: 

154 self.skipTest(f"Triplestore o Redis non disponibile: {str(e)}") 

155 

156 def tearDown(self): 

157 """Cleanup dopo ogni test""" 

158 if os.path.exists(self.temp_dir): 

159 shutil.rmtree(self.temp_dir) 

160 

161 def test_generate_sparql_queries(self): 

162 """Test della generazione delle query SPARQL""" 

163 # Prepara i dati di test 

164 quads_to_add = [ 

165 ("<subj1>", "<pred1>", '"obj1"', "<graph1>"), 

166 ("<subj2>", "<pred2>", '"obj2"', "<graph1>"), 

167 ("<subj3>", "<pred3>", '"obj3"', "<graph2>"), 

168 ] 

169 quads_to_remove = [("<subj4>", "<pred4>", '"obj4"', "<graph1>")] 

170 

171 # Genera le query 

172 queries = generate_sparql_queries(quads_to_add, quads_to_remove, batch_size=2) 

173 

174 # Verifica il risultato 

175 self.assertEqual(len(queries), 3) # 2 INSERT (batch size 2) + 1 DELETE 

176 self.assertTrue(any(q.startswith("INSERT DATA {") for q in queries)) 

177 self.assertTrue(any(q.startswith("DELETE DATA {") for q in queries)) 

178 

179 def test_cache_operations(self): 

180 """Test delle operazioni di cache con CacheManager""" 

181 # Inizializza cache manager con il DB di test 

182 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB) 

183 

184 # Aggiungi file al cache 

185 test_files = {"file1.sparql", "file2.sparql"} 

186 for file in test_files: 

187 cache_manager.add(file) 

188 

189 # Forza il salvataggio 

190 cache_manager._cleanup() 

191 

192 # Verifica che il file esista 

193 self.assertTrue(os.path.exists(self.cache_file)) 

194 

195 # Crea nuovo cache manager e verifica il contenuto 

196 new_cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB) 

197 self.assertEqual(new_cache_manager.get_all(), test_files) 

198 

199 def test_failed_query_logging(self): 

200 """Test del logging delle query fallite""" 

201 test_file = "failed_test.sparql" 

202 save_failed_query_file(test_file, self.failed_file) 

203 

204 with open(self.failed_file, "r") as f: 

205 content = f.read() 

206 self.assertIn(test_file, content) 

207 

208 def test_execute_sparql_update(self): 

209 """Test dell'esecuzione di una query SPARQL""" 

210 # Test con una query valida 

211 valid_query = """ 

212 INSERT DATA { 

213 GRAPH <http://test.graph> { 

214 <http://test.subject> <http://test.predicate> "test object" . 

215 } 

216 } 

217 """ 

218 success = execute_sparql_update(SERVER, valid_query) 

219 self.assertTrue(success) 

220 

221 # Test con una query non valida 

222 invalid_query = "INVALID SPARQL QUERY" 

223 success = execute_sparql_update(SERVER, invalid_query) 

224 self.assertFalse(success) 

225 

226 def test_upload_with_stop_file(self): 

227 """Test dell'interruzione dell'upload tramite stop file""" 

228 # Crea directory temporanea con file SPARQL 

229 sparql_dir = os.path.join(self.temp_dir, "sparql_files") 

230 os.makedirs(sparql_dir) 

231 

232 # Crea alcuni file SPARQL di test 

233 test_query = """ 

234 INSERT DATA { 

235 GRAPH <http://test.graph> { 

236 <http://test.subject> <http://test.predicate> "test object" . 

237 } 

238 } 

239 """ 

240 for i in range(3): 

241 with open(os.path.join(sparql_dir, f"test{i}.sparql"), "w") as f: 

242 f.write(test_query) 

243 

244 # Crea il file di stop 

245 with open(self.stop_file, "w") as f: 

246 f.write("") 

247 

248 # Esegui l'upload 

249 upload_sparql_updates( 

250 SERVER, 

251 sparql_dir, 

252 batch_size=10, 

253 cache_file=self.cache_file, 

254 failed_file=self.failed_file, 

255 stop_file=self.stop_file, 

256 cache_manager=CacheManager(self.cache_file, redis_db=TEST_REDIS_DB), 

257 ) 

258 

259 # Verifica che il cache non contenga tutti i file 

260 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB) 

261 self.assertLess(len(cache_manager.get_all()), 3) 

262 

263 def test_upload_with_failures(self): 

264 """Test dell'upload con query fallite""" 

265 # Crea directory temporanea con file SPARQL 

266 sparql_dir = os.path.join(self.temp_dir, "sparql_files") 

267 os.makedirs(sparql_dir) 

268 

269 # Crea un file SPARQL valido 

270 valid_query = """ 

271 INSERT DATA { 

272 GRAPH <http://test.graph> { 

273 <http://test.subject> <http://test.predicate> "test object" . 

274 } 

275 } 

276 """ 

277 with open(os.path.join(sparql_dir, "valid.sparql"), "w") as f: 

278 f.write(valid_query) 

279 

280 # Crea un file SPARQL non valido 

281 invalid_query = "INVALID SPARQL QUERY" 

282 with open(os.path.join(sparql_dir, "invalid.sparql"), "w") as f: 

283 f.write(invalid_query) 

284 

285 # Esegui l'upload 

286 upload_sparql_updates( 

287 SERVER, 

288 sparql_dir, 

289 batch_size=10, 

290 cache_file=self.cache_file, 

291 failed_file=self.failed_file, 

292 stop_file=self.stop_file, 

293 cache_manager=CacheManager(self.cache_file, redis_db=TEST_REDIS_DB), 

294 ) 

295 

296 # Verifica i risultati usando CacheManager 

297 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB) 

298 self.assertIn("valid.sparql", cache_manager) 

299 self.assertNotIn("invalid.sparql", cache_manager) 

300 

301 # Verifica il file dei fallimenti 

302 with open(self.failed_file, "r") as f: 

303 failed_content = f.read() 

304 self.assertIn("invalid.sparql", failed_content) 

305 

306 

307class TestTriplestoreConnection(unittest.TestCase): 

308 def setUp(self): 

309 self.endpoint = SERVER 

310 self.connection = TriplestoreConnection(self.endpoint) 

311 

312 def test_singleton_pattern(self): 

313 """Test che verifica che venga usata la stessa istanza""" 

314 connection2 = TriplestoreConnection(self.endpoint) 

315 self.assertIs(self.connection, connection2) 

316 

317 def test_connection_update(self): 

318 """Test che verifica che la connessione possa essere aggiornata""" 

319 new_endpoint = "http://new.endpoint/sparql" 

320 connection2 = TriplestoreConnection(new_endpoint) 

321 self.assertEqual(connection2.sparql.endpoint, new_endpoint) 

322 

323 def test_execute_update(self): 

324 """Test dell'esecuzione di una query""" 

325 query = """ 

326 INSERT DATA { 

327 GRAPH <http://test.graph> { 

328 <http://test.subject> <http://test.predicate> "test object" . 

329 } 

330 } 

331 """ 

332 success = self.connection.execute_update(query) 

333 self.assertTrue(success) 

334 

335 

336if __name__ == "__main__": 

337 unittest.main()