Coverage for test/gen_info_dir_test.py: 97%
29 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1import unittest
2import os
3from oc_meta.run.gen_info_dir import explore_directories
4from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
5from oc_ocdm.support import get_count
6import shutil
7import redis
9class TestGenInfoDir(unittest.TestCase):
11 def setUp(self):
12 self.root_dir = os.path.join('test', 'gen_info_dir', 'rdf')
13 self.redis_host = 'localhost'
14 self.redis_port = 6379
15 self.redis_db = 0
16 self.redis_client = redis.Redis(host=self.redis_host, port=self.redis_port, db=self.redis_db)
17 self.redis_client.flushdb()
19 def tearDown(self):
20 self.redis_client.flushdb()
21 self.redis_client.close()
23 def test_explore_directories(self):
24 explore_directories(self.root_dir, self.redis_host, self.redis_port, self.redis_db)
26 # Check main counters
27 counter_handler = RedisCounterHandler(host=self.redis_host, port=self.redis_port, db=self.redis_db)
28 br_counter = counter_handler.read_counter("br", supplier_prefix="0670")
29 self.assertEqual(br_counter, 386000)
31 # Check provenance counters
32 prov_counter_101 = counter_handler.read_counter(
33 entity_short_name="br",
34 prov_short_name="se",
35 identifier=int(get_count('https://w3id.org/oc/meta/br/0670101')),
36 supplier_prefix="0670"
37 )
38 prov_counter_3 = counter_handler.read_counter(
39 entity_short_name="br",
40 prov_short_name="se",
41 identifier=int(get_count('https://w3id.org/oc/meta/br/06703')),
42 supplier_prefix="0670"
43 )
44 self.assertEqual(prov_counter_101, 2)
45 self.assertEqual(prov_counter_3, 1)
47if __name__ == "__main__":
48 unittest.main()