Coverage for test / datacite_process_test.py: 71%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2024 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2024-2026 Marta Soricetti <marta.soricetti@unibo.it> 

3# SPDX-FileCopyrightText: 2025 Arianna Moretti <arianna.moretti4@unibo.it> 

4# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8import csv 

9import json 

10import os 

11import shutil 

12import unittest 

13 

14from oc_ds_converter.run.datacite_process import preprocess 

15 

16 

17class DataciteProcessTest(unittest.TestCase): 

18 

19 def setUp(self) -> None: 

20 self.test_dir = os.path.join("test",'datacite_process') 

21 self.json_dir = os.path.join(self.test_dir, 'jsonFiles') 

22 self.output_dir = os.path.join(self.test_dir, 'output_dir') 

23 self.processing_test_dir = os.path.join('test', 'datacite_processing') 

24 self.publisher_mapping = os.path.join(self.processing_test_dir, 'publishers.csv') 

25 self.wanted_dois = os.path.join(self.processing_test_dir, 'wanted_dois') 

26 self.iod = os.path.join(self.processing_test_dir, 'iod') 

27 self.cache = os.path.join(self.test_dir, 'cache.json') 

28 self.cache_test = os.path.join(self.test_dir, 'cache_test.json') 

29 self.db = os.path.join(self.test_dir, 'anydb.db') 

30 # percorso input con il file NDJSON malformato 

31 self.error_input_folder = os.path.join(self.test_dir, 'sample_dc_error') 

32 # percorsi per report errori 

33 self.bad_dir = os.path.join(self.output_dir, '_bad') 

34 self.citations_output_path = self.output_dir + "_citations" 

35 

36 def test_preprocess_base_decompress_and_read(self): 

37 """Test base functionalities of the Datacite processor for producing META csv tables and INDEX tables: 

38 1) All the files in input dir are correctly processed 

39 2) The number of files in input corresponds to the number of files in output for citations 

40 3) The number of files in input are duplicated in the output folder for both citing and cited entities 

41 """ 

42 

43 if os.path.exists(self.output_dir): 

44 shutil.rmtree(self.output_dir) 

45 

46 citations_output_path = self.output_dir + "_citations" 

47 if os.path.exists(citations_output_path): 

48 shutil.rmtree(citations_output_path) 

49 

50 # assicura corretto funzionamento di _bad 

51 bad_dir = os.path.join(self.output_dir, '_bad') 

52 if os.path.exists(bad_dir): 

53 shutil.rmtree(bad_dir) 

54 

55 if os.path.exists(self.db): 

56 os.remove(self.db) 

57 if os.path.exists(self.cache): 

58 os.remove(self.cache) 

59 

60 preprocess(datacite_json_dir=self.json_dir, publishers_filepath=self.publisher_mapping, 

61 orcid_doi_filepath=self.iod, csv_dir=self.output_dir, redis_storage_manager=False, 

62 storage_path=self.db, cache=self.cache) 

63 

64 citations_in_output = 0 

65 

66 for file in os.listdir(citations_output_path): 

67 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

68 cits_rows = list(csv.DictReader(f)) 

69 citations_in_output += len(cits_rows) 

70 

71 #one self citation must not be considered 

72 expected_citations_in_output = 19 

73 

74 #excluding duplicated entities and one invalid doi 10.46979/rbn.v52i4.5546 

75 expected_entities_in_output = 22 

76 

77 self.assertEqual(expected_citations_in_output, citations_in_output) 

78 

79 citations_files_n = len(list(os.listdir(citations_output_path))) 

80 

81 shutil.rmtree(citations_output_path) 

82 

83 entities_in_meta_output = 0 

84 for file in os.listdir(self.output_dir): 

85 with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

86 entities_in_meta_output += len(list(csv.DictReader(f))) 

87 

88 self.assertEqual(expected_entities_in_output, entities_in_meta_output) 

89 

90 input_files_n = 2 

91 self.assertTrue(citations_files_n == input_files_n) 

92 

93 # CLEAN: output, _bad, decompressioni e db 

94 if os.path.exists(self.output_dir): 

95 shutil.rmtree(self.output_dir) 

96 bad_dir = os.path.join(self.output_dir, '_bad') 

97 if os.path.exists(bad_dir): 

98 shutil.rmtree(bad_dir) 

99 if os.path.exists(self.db): 

100 os.remove(self.db) 

101 if os.path.exists(self.cache): 

102 os.remove(self.cache) 

103 

104 def test_preprocess_orcid_api_disabled_no_index(self): 

105 """ 

106 With the ORCID API disabled and without a DOI->ORCID index, 

107 ORCIDs must not appear in _subject.csv files. 

108 """ 

109 # Pre-clean 

110 if os.path.exists(self.output_dir): 

111 shutil.rmtree(self.output_dir) 

112 

113 citations_output_path = self.output_dir + "_citations" 

114 if os.path.exists(citations_output_path): 

115 shutil.rmtree(citations_output_path) 

116 

117 # assicura corretto funzionamento di _bad 

118 bad_dir = os.path.join(self.output_dir, '_bad') 

119 if os.path.exists(bad_dir): 

120 shutil.rmtree(bad_dir) 

121 

122 if os.path.exists(self.db): 

123 os.remove(self.db) 

124 if os.path.exists(self.cache): 

125 os.remove(self.cache) 

126 

127 # Run with API disabled and no index 

128 preprocess( 

129 datacite_json_dir=self.json_dir, 

130 publishers_filepath=self.publisher_mapping, 

131 orcid_doi_filepath=None, 

132 csv_dir=self.output_dir, 

133 cache=self.cache, 

134 use_orcid_api=False 

135 ) 

136 

137 # Verify: no "[orcid:" in any _subject.csv "author" field 

138 found_orcid = False 

139 for file in os.listdir(self.output_dir): 

140 if file.endswith("_subject.csv"): 

141 with open(os.path.join(self.output_dir, file), "r", encoding="utf-8") as f: 

142 for row in csv.DictReader(f): 

143 if "[orcid:" in (row.get("author", "") or ""): 

144 found_orcid = True 

145 break 

146 if found_orcid: 

147 break 

148 

149 self.assertFalse(found_orcid) 

150 

151 # Post-clean 

152 # CLEAN: output, _bad, decompressioni e db 

153 if os.path.exists(self.output_dir): 

154 shutil.rmtree(self.output_dir) 

155 bad_dir = os.path.join(self.output_dir, '_bad') 

156 if os.path.exists(bad_dir): 

157 shutil.rmtree(bad_dir) 

158 if os.path.exists(self.db): 

159 os.remove(self.db) 

160 if os.path.exists(self.cache): 

161 os.remove(self.cache) 

162 

163 

164 def test_preprocess_orcid_api_disabled_no_leak(self): 

165 """With ORCID API disabled, authors should not contain [orcid:] unless the DOI is in the provided index. 

166 Our sample input DOIs with authors having ORCID nameIdentifiers are not covered by the sample index (iod), 

167 so no [orcid:] should appear in the subject CSVs.""" 

168 

169 # Pre-clean 

170 if os.path.exists(self.output_dir): 

171 shutil.rmtree(self.output_dir) 

172 

173 citations_output_path = self.output_dir + "_citations" 

174 if os.path.exists(citations_output_path): 

175 shutil.rmtree(citations_output_path) 

176 

177 # assicura corretto funzionamento di _bad 

178 bad_dir = os.path.join(self.output_dir, '_bad') 

179 if os.path.exists(bad_dir): 

180 shutil.rmtree(bad_dir) 

181 

182 if os.path.exists(self.db): 

183 os.remove(self.db) 

184 if os.path.exists(self.cache): 

185 os.remove(self.cache) 

186 

187 # Run the process with ORCID API disabled 

188 preprocess( 

189 datacite_json_dir=self.json_dir, 

190 publishers_filepath=self.publisher_mapping, 

191 orcid_doi_filepath=self.iod, 

192 csv_dir=self.output_dir, 

193 cache=self.cache, 

194 use_orcid_api=False, 

195 ) 

196 

197 # Scan subject CSVs and ensure authors contain no “[orcid:” token 

198 subject_rows = 0 

199 orcid_mentions = 0 

200 for fname in os.listdir(self.output_dir): 

201 if fname.endswith("_subject.csv"): 

202 with open(os.path.join(self.output_dir, fname), encoding="utf-8") as f: 

203 rdr = csv.DictReader(f) 

204 for row in rdr: 

205 subject_rows += 1 

206 if "[orcid:" in row.get("author", ""): 

207 orcid_mentions += 1 

208 

209 self.assertGreater(subject_rows, 0) 

210 self.assertEqual(orcid_mentions, 0) 

211 

212 # Post-clean 

213 # CLEAN: output, _bad, decompressioni e db 

214 if os.path.exists(self.output_dir): 

215 shutil.rmtree(self.output_dir) 

216 bad_dir = os.path.join(self.output_dir, '_bad') 

217 if os.path.exists(bad_dir): 

218 shutil.rmtree(bad_dir) 

219 if os.path.exists(self.db): 

220 os.remove(self.db) 

221 if os.path.exists(self.cache): 

222 os.remove(self.cache) 

223 

224 def test_any_db_creation_redis_no_testing(self): 

225 

226 # Pre-clean 

227 if os.path.exists(self.output_dir): 

228 shutil.rmtree(self.output_dir) 

229 

230 citations_output_path = self.output_dir + "_citations" 

231 if os.path.exists(citations_output_path): 

232 shutil.rmtree(citations_output_path) 

233 

234 # assicura corretto funzionamento di _bad 

235 bad_dir = os.path.join(self.output_dir, '_bad') 

236 if os.path.exists(bad_dir): 

237 shutil.rmtree(bad_dir) 

238 

239 if os.path.exists(self.db): 

240 os.remove(self.db) 

241 if os.path.exists(self.cache): 

242 os.remove(self.cache) 

243 

244 try: 

245 rsm = RedisStorageManager(testing=False) 

246 rsm.set_value("TEST VALUE", False) 

247 run_test = True 

248 except: 

249 run_test = False 

250 print("test skipped: 'test_any_db_creation_redis_no_testing': Connect to redis before running the test") 

251 

252 if run_test: 

253 rsm.del_value("TEST VALUE") 

254 if not len(rsm.get_all_keys()): 

255 preprocess(datacite_json_dir=self.json_dir, publishers_filepath=self.publisher_mapping, 

256 orcid_doi_filepath=self.iod, csv_dir=self.output_dir, redis_storage_manager=True, 

257 storage_path=self.db, cache=self.cache) 

258 

259 rsm.delete_storage() 

260 

261 else: 

262 

263 print("test skipped: 'test_storage_management_no_testing' because redis db 2 is not empty") 

264 

265 # Post-clean 

266 # CLEAN: output, _bad, decompressioni e db 

267 if os.path.exists(self.output_dir): 

268 shutil.rmtree(self.output_dir) 

269 bad_dir = os.path.join(self.output_dir, '_bad') 

270 if os.path.exists(bad_dir): 

271 shutil.rmtree(bad_dir) 

272 if os.path.exists(self.db): 

273 os.remove(self.db) 

274 if os.path.exists(self.cache): 

275 os.remove(self.cache) 

276 

277 def test_cache(self): 

278 'Nothing should be produced in output, since the cache file reports that all the files in input were completed' 

279 

280 # Pre-clean 

281 if os.path.exists(self.output_dir): 

282 shutil.rmtree(self.output_dir) 

283 

284 citations_output_path = self.output_dir + "_citations" 

285 if os.path.exists(citations_output_path): 

286 shutil.rmtree(citations_output_path) 

287 

288 # assicura corretto funzionamento di _bad 

289 bad_dir = os.path.join(self.output_dir, '_bad') 

290 if os.path.exists(bad_dir): 

291 shutil.rmtree(bad_dir) 

292 

293 if os.path.exists(self.db): 

294 os.remove(self.db) 

295 if os.path.exists(self.cache): 

296 os.remove(self.cache) 

297 

298 with open(self.cache_test, "w", encoding="utf-8") as write_cache: 

299 processed_files_dict = {'first_iteration': ['jSonFile_1', 'jSonFile_2'], 

300 'second_iteration': ['jSonFile_1', 'jSonFile_2']} 

301 json.dump(processed_files_dict, write_cache) 

302 

303 preprocess(datacite_json_dir=self.json_dir, publishers_filepath=self.publisher_mapping, 

304 orcid_doi_filepath=self.iod, csv_dir=self.output_dir, redis_storage_manager=False, 

305 storage_path=self.db, cache=self.cache_test) 

306 

307 citations_in_output = 0 

308 encountered_ids = set() 

309 unique_entities = 0 

310 

311 for file in os.listdir(citations_output_path): 

312 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

313 cits_rows = list(csv.DictReader(f)) 

314 citations_in_output += len(cits_rows) 

315 for x in cits_rows: 

316 citing_ids = x["citing"].split(" ") 

317 citied_ids = x["cited"].split(" ") 

318 if all(id not in encountered_ids for id in citing_ids): 

319 unique_entities += 1 

320 encountered_ids.update(citing_ids) 

321 

322 if all(id not in encountered_ids for id in citied_ids): 

323 unique_entities += 1 

324 encountered_ids.update(citied_ids) 

325 

326 expected_citations_in_output = 0 

327 

328 expected_entities_in_output = 0 

329 

330 self.assertEqual(expected_entities_in_output, unique_entities) 

331 self.assertEqual(expected_citations_in_output, citations_in_output) 

332 

333 # Post-clean 

334 # CLEAN: output, _bad, decompressioni e db 

335 if os.path.exists(self.output_dir): 

336 shutil.rmtree(self.output_dir) 

337 bad_dir = os.path.join(self.output_dir, '_bad') 

338 if os.path.exists(bad_dir): 

339 shutil.rmtree(bad_dir) 

340 if os.path.exists(self.db): 

341 os.remove(self.db) 

342 if os.path.exists(self.cache): 

343 os.remove(self.cache) 

344 

345if __name__ == '__main__': 

346 unittest.main()