Coverage for test / idm_jid_test.py: 99%

166 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023-2026 Marta Soricetti <marta.soricetti@unibo.it> 

2# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# 

4# SPDX-License-Identifier: ISC 

5 

6import json 

7import unittest 

8from os import makedirs, remove 

9from os.path import exists, join 

10 

11from oc_ds_converter.oc_idmanager.jid import JIDManager 

12from oc_ds_converter.oc_idmanager.oc_data_storage.sqlite_manager import SqliteStorageManager 

13from oc_ds_converter.oc_idmanager.oc_data_storage.in_memory_manager import InMemoryStorageManager 

14from oc_ds_converter.oc_idmanager.oc_data_storage.redis_manager import RedisStorageManager 

15 

16 

17class JidIdentifierManagerTest(unittest.TestCase): 

18 """This class aim at testing jid identifiers manager.""" 

19 

20 def setUp(self): 

21 if not exists("tmp"): 

22 makedirs("tmp") 

23 

24 self.test_dir = join("test","data") 

25 self.test_json_path = join(self.test_dir, "glob.json") 

26 with open(self.test_json_path, encoding="utf-8") as fp: 

27 self.data = json.load(fp) 

28 

29 self.valid_jid_1 = "otoljpn1970" 

30 self.valid_jid_2 = "jscej1944b" 

31 self.valid_jid_3 = "japeoj" # SYS_ERR_009 

32 self.invalid_jid_1 = "hjmee" 

33 self.invalid_jid_2 = "gee1973e" # saved in glob as invalid 

34 

35 def test_jid_normalise(self): 

36 jm = JIDManager() 

37 self.assertEqual( 

38 self.valid_jid_1, jm.normalise(self.valid_jid_1.replace("", " ")) 

39 ) 

40 self.assertEqual( 

41 self.valid_jid_2, jm.normalise("jid:" + self.valid_jid_2) 

42 ) 

43 

44 def test_jid_syntax_ok(self): 

45 jm = JIDManager() 

46 self.assertTrue(jm.syntax_ok(self.valid_jid_1)) 

47 self.assertTrue(jm.syntax_ok(self.invalid_jid_1)) 

48 self.assertFalse(jm.syntax_ok('1950' + self.valid_jid_1)) 

49 

50 def test_jid_is_valid(self): 

51 jm = JIDManager() 

52 self.assertTrue(jm.is_valid(self.valid_jid_1)) 

53 self.assertTrue(jm.is_valid(self.valid_jid_2)) 

54 self.assertFalse(jm.is_valid(self.invalid_jid_1)) 

55 self.assertFalse(jm.is_valid(self.invalid_jid_2)) 

56 

57 jm_file = JIDManager(testing=True) 

58 self.assertTrue(jm_file.normalise(self.valid_jid_1, include_prefix=True) in self.data) 

59 self.assertTrue(jm_file.normalise(self.valid_jid_2, include_prefix=True) in self.data) 

60 self.assertTrue(jm_file.normalise(self.valid_jid_3, include_prefix=True) in self.data) 

61 self.assertTrue(jm_file.is_valid(jm_file.normalise(self.valid_jid_1, include_prefix=True))) 

62 self.assertTrue(jm_file.is_valid(jm_file.normalise(self.valid_jid_2, include_prefix=True))) 

63 self.assertTrue(jm_file.is_valid(jm_file.normalise(self.valid_jid_3, include_prefix=True))) 

64 

65 jm_nofile_noapi = JIDManager(testing=True, use_api_service=False) 

66 self.assertTrue(jm_nofile_noapi.is_valid(self.valid_jid_1)) 

67 self.assertTrue(jm_nofile_noapi.is_valid(self.invalid_jid_1)) 

68 

69 def test_jid_exists(self): 

70 with self.subTest(msg="get_extra_info = True, allow_extra_api=None"): 

71 jm = JIDManager() 

72 output = jm.exists(self.valid_jid_1, get_extra_info=True, allow_extra_api=None) 

73 expected_output = (True, {"valid": True}) 

74 self.assertEqual(output, expected_output) 

75 with self.subTest(msg="get_extra_info = True, allow_extra_api=None"): 

76 jm = JIDManager() 

77 output = jm.exists(self.valid_jid_3, get_extra_info=True, allow_extra_api=None) 

78 expected_output = (True, {"valid": True}) 

79 self.assertEqual(output, expected_output) 

80 with self.subTest(msg="get_extra_info = True, allow_extra_api=None"): 

81 jm = JIDManager() 

82 output = jm.exists(self.invalid_jid_1, get_extra_info=True, allow_extra_api=None) 

83 expected_output = (False, {"valid": False}) 

84 self.assertEqual(output, expected_output) 

85 

86 def test_jid_default(self): 

87 jm_nofile = JIDManager(testing=True) 

88 # Uses RedisStorageManager with testing=True (fakeredis) 

89 # uses API 

90 self.assertTrue(jm_nofile.is_valid(self.valid_jid_1)) 

91 self.assertTrue(jm_nofile.is_valid(self.valid_jid_2)) 

92 self.assertTrue(jm_nofile.is_valid(self.valid_jid_3)) 

93 self.assertFalse(jm_nofile.is_valid(self.invalid_jid_1)) 

94 self.assertFalse(jm_nofile.is_valid(self.invalid_jid_2)) 

95 validated_ids = [self.valid_jid_1, self.valid_jid_2, self.valid_jid_3, self.invalid_jid_1, self.invalid_jid_2] 

96 # check that all the validated ids are stored in redis 

97 all_ids_stored = jm_nofile.storage_manager.get_all_keys() 

98 self.assertTrue(all(jm_nofile.normalise(x, include_prefix=True) in all_ids_stored for x in validated_ids)) 

99 jm_nofile.storage_manager.delete_storage() 

100 # check that the storage was correctly deleted 

101 self.assertEqual(jm_nofile.storage_manager.get_all_keys(), set()) 

102 

103 #### IN MEMORY STORAGE MANAGER 

104 

105 def test_jid_memory_file_noapi(self): 

106 # Uses pre-seeded data (without updating it) 

107 # Uses RedisStorageManager storage manager 

108 # does not use API (so a syntactically correct id is considered to be valid) 

109 jm_file = JIDManager(testing=True, use_api_service=False) 

110 # Pre-seed storage with data from glob.json 

111 for key, value in self.data.items(): 

112 if key.startswith("jid:"): 

113 jm_file.storage_manager.set_value(key, value.get("valid", False)) 

114 self.assertTrue(jm_file.normalise(self.valid_jid_1.replace("", " "), include_prefix=True) in self.data) 

115 self.assertTrue(jm_file.normalise("jid:" + self.valid_jid_2, include_prefix=True) in self.data) 

116 self.assertTrue(jm_file.is_valid(self.valid_jid_1)) 

117 self.assertFalse(jm_file.is_valid(self.invalid_jid_2)) # is stored as invalid 

118 self.assertTrue(jm_file.is_valid("jid:pjab1978")) # not stored as invalid, has correct syntax 

119 

120 def test_jid_memory_file_api(self): 

121 # Uses support file (without updating it) 

122 # Uses RedisStorageManager storage manager 

123 # uses API (so a syntactically correct id which is not valid is considered to be invalid) 

124 jm_file = JIDManager(testing=True, use_api_service=True) 

125 self.assertFalse(jm_file.is_valid("jid:pjab1978")) 

126 

127 def test_jid_memory_nofile_noapi(self): 

128 # Does not use support file 

129 # Uses RedisStorageManager storage manager 

130 # Does not use API (so a syntactically correct id which is not valid is considered to be valid) 

131 jm_nofile_noapi = JIDManager(testing=True, use_api_service=False) 

132 self.assertTrue(jm_nofile_noapi.is_valid(self.valid_jid_1)) 

133 self.assertTrue(jm_nofile_noapi.is_valid(self.invalid_jid_1)) 

134 jm_nofile_noapi.storage_manager.delete_storage() 

135 

136 #### SQLITE STORAGE MANAGER 

137 

138 def test_jid_sqlite_nofile_api(self): 

139 # No pre-existing data 

140 # Uses RedisStorageManager 

141 # uses API 

142 sql_jm_nofile = JIDManager(testing=True) 

143 self.assertTrue(sql_jm_nofile.is_valid(self.valid_jid_1)) 

144 self.assertTrue(sql_jm_nofile.is_valid(self.valid_jid_2)) 

145 self.assertTrue(sql_jm_nofile.is_valid(self.valid_jid_3)) 

146 self.assertFalse(sql_jm_nofile.is_valid(self.invalid_jid_1)) 

147 self.assertFalse(sql_jm_nofile.is_valid(self.invalid_jid_2)) 

148 # check that the redis storage contains all the validated ids 

149 validated_ids = [self.valid_jid_1, self.valid_jid_2, self.valid_jid_3, self.invalid_jid_1, self.invalid_jid_2] 

150 all_ids_stored = sql_jm_nofile.storage_manager.get_all_keys() 

151 self.assertTrue(all(sql_jm_nofile.normalise(x, include_prefix=True) in all_ids_stored for x in validated_ids)) 

152 sql_jm_nofile.storage_manager.delete_storage() 

153 # check that the storage was correctly deleted 

154 self.assertEqual(sql_jm_nofile.storage_manager.get_all_keys(), set()) 

155 

156 def test_jid_sqlite_file_api(self): 

157 # Uses support file 

158 # Uses SqliteStorageManager 

159 # does not use API (so a syntactically correct id is considered to be valid) 

160 # db creation 

161 test_sqlite_db = join(self.test_dir, "database.db") 

162 if exists(test_sqlite_db): 

163 remove(test_sqlite_db) 

164 to_insert = [self.invalid_jid_1, self.valid_jid_1, self.valid_jid_3] 

165 sql_file = JIDManager(testing=True, use_api_service=True) 

166 for jid in to_insert: 

167 norm_id = sql_file.normalise(jid, include_prefix=True) 

168 is_valid = sql_file.is_valid(norm_id) 

169 sql_file.storage_manager.set_value(norm_id, is_valid) 

170 

171 sql_no_api = JIDManager(testing=True, use_api_service=False) 

172 # Copy values from the first manager to the second for testing 

173 for jid in to_insert: 

174 norm_id = sql_no_api.normalise(jid, include_prefix=True) 

175 value = sql_file.storage_manager.get_value(norm_id) 

176 if value is not None: 

177 sql_no_api.storage_manager.set_value(norm_id, value) 

178 all_db_keys = sql_no_api.storage_manager.get_all_keys() 

179 # check that all the normalised ids in the list were correctly inserted 

180 self.assertTrue(all(sql_no_api.normalise(x, include_prefix=True) in all_db_keys for x in to_insert)) 

181 self.assertTrue(sql_no_api.is_valid(self.valid_jid_1)) # is stored as valid 

182 self.assertTrue(sql_no_api.is_valid(self.valid_jid_3)) # is stored as valid 

183 self.assertFalse(sql_no_api.is_valid(self.invalid_jid_1)) # is stored as invalid 

184 self.assertTrue(sql_no_api.is_valid("jid:pjab1978")) # not stored as invalid, has correct syntax 

185 sql_no_api.storage_manager.delete_storage() 

186 

187 def test_jid_sqlite_nofile_noapi(self): 

188 # Does not use support file 

189 # Uses RedisStorageManager 

190 # Does not use API (so a syntactically correct id which is not valid is considered to be valid) 

191 jm_nofile_noapi = JIDManager(testing=True, use_api_service=False) 

192 self.assertTrue(jm_nofile_noapi.is_valid(self.valid_jid_1)) 

193 self.assertTrue(jm_nofile_noapi.is_valid(self.invalid_jid_1)) 

194 jm_nofile_noapi.storage_manager.delete_storage() 

195 

196 #### REDIS STORAGE MANAGER 

197 def test_jid_redis_nofile_api(self): 

198 # No available data in redis db 

199 # Storage manager : RedisStorageManager 

200 # uses API 

201 jm_nofile = JIDManager(storage_manager=RedisStorageManager(testing=True)) 

202 self.assertTrue(jm_nofile.is_valid(self.valid_jid_1)) 

203 self.assertTrue(jm_nofile.is_valid(self.valid_jid_2)) 

204 self.assertTrue(jm_nofile.is_valid(self.valid_jid_3)) 

205 self.assertFalse(jm_nofile.is_valid(self.invalid_jid_1)) 

206 self.assertFalse(jm_nofile.is_valid(self.invalid_jid_2)) 

207 # check that the redis db was correctly filled and that it contains all the validated ids 

208 

209 validated_ids = {self.valid_jid_1, self.valid_jid_2, self.valid_jid_3, self.invalid_jid_1, 

210 self.invalid_jid_2} 

211 validated_ids = {jm_nofile.normalise(x, include_prefix=True) for x in validated_ids} 

212 all_ids_stored = jm_nofile.storage_manager.get_all_keys() 

213 # check that all the validated ids are stored in the json file 

214 self.assertEqual(validated_ids, all_ids_stored) 

215 jm_nofile.storage_manager.delete_storage() 

216 # check that the support file was correctly deleted 

217 self.assertEqual(jm_nofile.storage_manager.get_all_keys(), set()) 

218 

219 def test_jid_redis_file_api(self): 

220 # Uses data in redis db 

221 # Uses RedisStorageManager 

222 # fills db 

223 

224 #use API to save validity values 

225 to_insert = [self.invalid_jid_1, self.valid_jid_1, self.valid_jid_3] 

226 storage_manager = RedisStorageManager(testing=True) 

227 redis_file = JIDManager(storage_manager=storage_manager, use_api_service=True) 

228 for id in to_insert: 

229 norm_id = redis_file.normalise(id, include_prefix=True) 

230 is_valid = redis_file.is_valid(norm_id) 

231 # insert_tup = (norm_id, is_valid) 

232 redis_file.storage_manager.set_value(norm_id, is_valid) 

233 

234 # does not use API, retrieve values from DB 

235 redis_no_api = JIDManager(storage_manager=storage_manager, use_api_service=False) 

236 all_db_keys = redis_no_api.storage_manager.get_all_keys() 

237 # check that all the normalised ids in the list were correctly inserted in the db 

238 self.assertTrue(all(redis_no_api.normalise(x, include_prefix=True) in all_db_keys for x in to_insert)) 

239 self.assertTrue(redis_no_api.is_valid(self.valid_jid_1)) # is stored in support file as valid 

240 self.assertTrue(redis_no_api.is_valid(self.valid_jid_2)) # is stored in support file as valid 

241 self.assertFalse(redis_no_api.is_valid(self.invalid_jid_1)) # is stored in support file as invalid 

242 self.assertTrue(redis_no_api.is_valid(self.invalid_jid_2)) # is not stored in support file as invalid, does not exist but has correct syntax 

243 redis_no_api.storage_manager.delete_storage() 

244 

245 def test_jid_redis_nofile_noapi(self): 

246 # No data in redis db 

247 # Uses RedisStorageManager 

248 # Does not use API (so a syntactically correct id which is not valid is considered to be valid) 

249 jm_nofile_noapi = JIDManager(storage_manager=RedisStorageManager(testing=True), use_api_service=False) 

250 self.assertTrue(jm_nofile_noapi.is_valid(self.valid_jid_1)) 

251 self.assertTrue(jm_nofile_noapi.is_valid(self.invalid_jid_1)) 

252 

253 jm_nofile_noapi.storage_manager.delete_storage()