Coverage for test/gen_info_dir_test.py: 97%

29 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1import unittest 

2import os 

3from oc_meta.run.gen_info_dir import explore_directories 

4from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler 

5from oc_ocdm.support import get_count 

6import shutil 

7import redis 

8 

9class TestGenInfoDir(unittest.TestCase): 

10 

11 def setUp(self): 

12 self.root_dir = os.path.join('test', 'gen_info_dir', 'rdf') 

13 self.redis_host = 'localhost' 

14 self.redis_port = 6379 

15 self.redis_db = 0 

16 self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port, db=self.redis_db) 

17 self.redis_client.flushdb() 

18 

19 def tearDown(self): 

20 self.redis_client.flushdb() 

21 self.redis_client.close() 

22 

23 def test_explore_directories(self): 

24 explore_directories(self.root_dir, self.redis_host, self.redis_port, self.redis_db) 

25 

26 # Check main counters 

27 counter_handler = RedisCounterHandler(host=self.redis_host, port=self.redis_port, db=self.redis_db) 

28 br_counter = counter_handler.read_counter("br", supplier_prefix="0670") 

29 self.assertEqual(br_counter, 386000) 

30 

31 # Check provenance counters 

32 prov_counter_101 = counter_handler.read_counter( 

33 entity_short_name="br", 

34 prov_short_name="se", 

35 identifier=int(get_count('https://w3id.org/oc/meta/br/0670101')), 

36 supplier_prefix="0670" 

37 ) 

38 prov_counter_3 = counter_handler.read_counter( 

39 entity_short_name="br", 

40 prov_short_name="se", 

41 identifier=int(get_count('https://w3id.org/oc/meta/br/06703')), 

42 supplier_prefix="0670" 

43 ) 

44 self.assertEqual(prov_counter_101, 2) 

45 self.assertEqual(prov_counter_3, 1) 

46 

47if __name__ == "__main__": 

48 unittest.main()