Coverage for test/on_triplestore_test.py: 96%
166 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1import json
2import os
3import shutil
4import unittest
5from unittest.mock import patch
6import time
8import redis
9from oc_meta.run.upload.cache_manager import CacheManager
10from oc_meta.run.upload.on_triplestore import (
11 execute_sparql_update,
12 generate_sparql_queries,
13 save_failed_query_file,
14 upload_sparql_updates,
15)
16from oc_meta.run.upload.triplestore_connection import TriplestoreConnection
17from SPARQLWrapper import POST, SPARQLWrapper
19SERVER = "http://127.0.0.1:8805/sparql"
20TEST_REDIS_DB = 2
23def reset_triplestore():
24 """Reset the test triplestore"""
25 sparql = SPARQLWrapper(SERVER)
26 sparql.setMethod(POST)
27 sparql.setQuery("DELETE WHERE { GRAPH ?g { ?s ?p ?o } }")
28 sparql.query()
31def reset_redis():
32 """Reset the test Redis database"""
33 try:
34 r = redis.Redis(db=TEST_REDIS_DB, decode_responses=True)
35 r.flushdb()
36 except redis.ConnectionError:
37 print("Warning: Redis not available for tests")
40def save_failed_query_file(filename, failed_file):
41 with open(failed_file, "a", encoding="utf8") as f:
42 f.write(f"{filename}\n")
45class TestCacheManager(unittest.TestCase):
46 def setUp(self):
47 """Setup per ogni test"""
48 self.temp_dir = os.path.join("test", "temp_cache_test")
49 if os.path.exists(self.temp_dir):
50 shutil.rmtree(self.temp_dir)
51 os.makedirs(self.temp_dir)
53 self.cache_file = os.path.join(self.temp_dir, "test_cache.json")
54 reset_redis()
56 def tearDown(self):
57 """Cleanup dopo ogni test"""
58 if os.path.exists(self.temp_dir):
59 shutil.rmtree(self.temp_dir)
60 reset_redis()
62 def test_cache_initialization(self):
63 """Test dell'inizializzazione del cache"""
64 # Crea un file JSON di cache esistente
65 initial_files = ["file1.sparql", "file2.sparql"]
66 with open(self.cache_file, "w") as f:
67 json.dump(initial_files, f)
69 # Inizializza CacheManager con il DB di test
70 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB)
72 # Verifica che i file siano stati caricati
73 self.assertEqual(cache_manager.get_all(), set(initial_files))
75 # Verifica che i file siano stati sincronizzati su Redis
76 if cache_manager._redis:
77 redis_files = cache_manager._redis.smembers(CacheManager.REDIS_KEY)
78 self.assertEqual(redis_files, set(initial_files))
80 def test_add_and_contains(self):
81 """Test dell'aggiunta di file e verifica della presenza"""
82 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB)
84 # Aggiungi un file
85 test_file = "test.sparql"
86 cache_manager.add(test_file)
88 # Verifica presenza nel cache locale
89 self.assertIn(test_file, cache_manager.processed_files)
91 # Verifica presenza in Redis
92 if cache_manager._redis:
93 self.assertTrue(
94 cache_manager._redis.sismember(CacheManager.REDIS_KEY, test_file)
95 )
97 # Verifica operatore in
98 self.assertIn(test_file, cache_manager)
100 def test_persistence(self):
101 """Test della persistenza dei dati"""
102 # Crea e popola un cache manager
103 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB)
104 test_files = ["test1.sparql", "test2.sparql"]
105 for file in test_files:
106 cache_manager.add(file)
108 # Forza il salvataggio
109 cache_manager._cleanup()
111 # Verifica il contenuto del file JSON
112 with open(self.cache_file, "r") as f:
113 saved_files = set(json.load(f))
114 self.assertEqual(saved_files, set(test_files))
116 # Crea un nuovo cache manager e verifica che carichi i dati correttamente
117 new_cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB)
118 self.assertEqual(new_cache_manager.get_all(), set(test_files))
120 @patch("redis.Redis")
121 def test_redis_fallback(self, mock_redis):
122 """Test del fallback su JSON quando Redis non è disponibile"""
123 # Simula Redis non disponibile
124 mock_redis.side_effect = redis.ConnectionError()
126 # Crea cache manager
127 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB)
129 # Verifica che stia usando solo JSON
130 self.assertIsNone(cache_manager._redis)
132 # Verifica che le operazioni funzionino comunque
133 test_file = "test.sparql"
134 cache_manager.add(test_file)
135 self.assertIn(test_file, cache_manager)
138class TestOnTriplestore(unittest.TestCase):
139 def setUp(self):
140 """Setup per ogni test"""
141 self.temp_dir = os.path.join("test", "temp_triplestore_test")
142 if os.path.exists(self.temp_dir):
143 shutil.rmtree(self.temp_dir)
144 os.makedirs(self.temp_dir)
146 self.cache_file = os.path.join(self.temp_dir, "ts_upload_cache.json")
147 self.failed_file = os.path.join(self.temp_dir, "failed_queries.txt")
148 self.stop_file = os.path.join(self.temp_dir, ".stop_upload")
150 try:
151 reset_triplestore()
152 reset_redis()
153 except Exception as e:
154 self.skipTest(f"Triplestore o Redis non disponibile: {str(e)}")
156 def tearDown(self):
157 """Cleanup dopo ogni test"""
158 if os.path.exists(self.temp_dir):
159 shutil.rmtree(self.temp_dir)
161 def test_generate_sparql_queries(self):
162 """Test della generazione delle query SPARQL"""
163 # Prepara i dati di test
164 quads_to_add = [
165 ("<subj1>", "<pred1>", '"obj1"', "<graph1>"),
166 ("<subj2>", "<pred2>", '"obj2"', "<graph1>"),
167 ("<subj3>", "<pred3>", '"obj3"', "<graph2>"),
168 ]
169 quads_to_remove = [("<subj4>", "<pred4>", '"obj4"', "<graph1>")]
171 # Genera le query
172 queries = generate_sparql_queries(quads_to_add, quads_to_remove, batch_size=2)
174 # Verifica il risultato
175 self.assertEqual(len(queries), 3) # 2 INSERT (batch size 2) + 1 DELETE
176 self.assertTrue(any(q.startswith("INSERT DATA {") for q in queries))
177 self.assertTrue(any(q.startswith("DELETE DATA {") for q in queries))
179 def test_cache_operations(self):
180 """Test delle operazioni di cache con CacheManager"""
181 # Inizializza cache manager con il DB di test
182 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB)
184 # Aggiungi file al cache
185 test_files = {"file1.sparql", "file2.sparql"}
186 for file in test_files:
187 cache_manager.add(file)
189 # Forza il salvataggio
190 cache_manager._cleanup()
192 # Verifica che il file esista
193 self.assertTrue(os.path.exists(self.cache_file))
195 # Crea nuovo cache manager e verifica il contenuto
196 new_cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB)
197 self.assertEqual(new_cache_manager.get_all(), test_files)
199 def test_failed_query_logging(self):
200 """Test del logging delle query fallite"""
201 test_file = "failed_test.sparql"
202 save_failed_query_file(test_file, self.failed_file)
204 with open(self.failed_file, "r") as f:
205 content = f.read()
206 self.assertIn(test_file, content)
208 def test_execute_sparql_update(self):
209 """Test dell'esecuzione di una query SPARQL"""
210 # Test con una query valida
211 valid_query = """
212 INSERT DATA {
213 GRAPH <http://test.graph> {
214 <http://test.subject> <http://test.predicate> "test object" .
215 }
216 }
217 """
218 success = execute_sparql_update(SERVER, valid_query)
219 self.assertTrue(success)
221 # Test con una query non valida
222 invalid_query = "INVALID SPARQL QUERY"
223 success = execute_sparql_update(SERVER, invalid_query)
224 self.assertFalse(success)
226 def test_upload_with_stop_file(self):
227 """Test dell'interruzione dell'upload tramite stop file"""
228 # Crea directory temporanea con file SPARQL
229 sparql_dir = os.path.join(self.temp_dir, "sparql_files")
230 os.makedirs(sparql_dir)
232 # Crea alcuni file SPARQL di test
233 test_query = """
234 INSERT DATA {
235 GRAPH <http://test.graph> {
236 <http://test.subject> <http://test.predicate> "test object" .
237 }
238 }
239 """
240 for i in range(3):
241 with open(os.path.join(sparql_dir, f"test{i}.sparql"), "w") as f:
242 f.write(test_query)
244 # Crea il file di stop
245 with open(self.stop_file, "w") as f:
246 f.write("")
248 # Esegui l'upload
249 upload_sparql_updates(
250 SERVER,
251 sparql_dir,
252 batch_size=10,
253 cache_file=self.cache_file,
254 failed_file=self.failed_file,
255 stop_file=self.stop_file,
256 cache_manager=CacheManager(self.cache_file, redis_db=TEST_REDIS_DB),
257 )
259 # Verifica che il cache non contenga tutti i file
260 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB)
261 self.assertLess(len(cache_manager.get_all()), 3)
263 def test_upload_with_failures(self):
264 """Test dell'upload con query fallite"""
265 # Crea directory temporanea con file SPARQL
266 sparql_dir = os.path.join(self.temp_dir, "sparql_files")
267 os.makedirs(sparql_dir)
269 # Crea un file SPARQL valido
270 valid_query = """
271 INSERT DATA {
272 GRAPH <http://test.graph> {
273 <http://test.subject> <http://test.predicate> "test object" .
274 }
275 }
276 """
277 with open(os.path.join(sparql_dir, "valid.sparql"), "w") as f:
278 f.write(valid_query)
280 # Crea un file SPARQL non valido
281 invalid_query = "INVALID SPARQL QUERY"
282 with open(os.path.join(sparql_dir, "invalid.sparql"), "w") as f:
283 f.write(invalid_query)
285 # Esegui l'upload
286 upload_sparql_updates(
287 SERVER,
288 sparql_dir,
289 batch_size=10,
290 cache_file=self.cache_file,
291 failed_file=self.failed_file,
292 stop_file=self.stop_file,
293 cache_manager=CacheManager(self.cache_file, redis_db=TEST_REDIS_DB),
294 )
296 # Verifica i risultati usando CacheManager
297 cache_manager = CacheManager(self.cache_file, redis_db=TEST_REDIS_DB)
298 self.assertIn("valid.sparql", cache_manager)
299 self.assertNotIn("invalid.sparql", cache_manager)
301 # Verifica il file dei fallimenti
302 with open(self.failed_file, "r") as f:
303 failed_content = f.read()
304 self.assertIn("invalid.sparql", failed_content)
307class TestTriplestoreConnection(unittest.TestCase):
308 def setUp(self):
309 self.endpoint = SERVER
310 self.connection = TriplestoreConnection(self.endpoint)
312 def test_singleton_pattern(self):
313 """Test che verifica che venga usata la stessa istanza"""
314 connection2 = TriplestoreConnection(self.endpoint)
315 self.assertIs(self.connection, connection2)
317 def test_connection_update(self):
318 """Test che verifica che la connessione possa essere aggiornata"""
319 new_endpoint = "http://new.endpoint/sparql"
320 connection2 = TriplestoreConnection(new_endpoint)
321 self.assertEqual(connection2.sparql.endpoint, new_endpoint)
323 def test_execute_update(self):
324 """Test dell'esecuzione di una query"""
325 query = """
326 INSERT DATA {
327 GRAPH <http://test.graph> {
328 <http://test.subject> <http://test.predicate> "test object" .
329 }
330 }
331 """
332 success = self.connection.execute_update(query)
333 self.assertTrue(success)
336if __name__ == "__main__":
337 unittest.main()