Coverage for test / openaire_process_test.py: 88%

239 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023-2024 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# 

4# SPDX-License-Identifier: ISC 

5 

6import os.path 

7import shutil 

8import unittest 

9from os.path import join 

10from oc_ds_converter.run.openaire_process import * 

11 

12 

13class OpenAireProcessTest(unittest.TestCase): 

14 maxDiff = None 

15 def setUp(self): 

16 self.test_dir = join("test", "openaire_process") 

17 self.sample_1tar = join(self.test_dir, "1_tar_sample") 

18 self.sample_dupl = join(self.test_dir, "duplicates_sample") 

19 self.sample_2tar = join(self.test_dir, "2_tar_sample") 

20 self.sample_1tar_alt = join(self.test_dir, "1alt_tar_sample") 

21 self.output_dir = join(self.test_dir, "tmp") 

22 self.support_mat = join(self.test_dir, "support_mat") 

23 self.cache_test1 = join(self.support_mat, "cache_1.json") 

24 self.doi_orcid = join("test", "openaire_processing", "iod") 

25 

26 self.any_db = join("test", "openaire_process", "anydb.db") 

27 

28 self.publishers_file = join(self.support_mat, "publishers.json") 

29 self.journals_file = join(self.support_mat, "journals.json") 

30 

31 self.publishers_dir_todel = join(self.support_mat, "publishers") 

32 self.publishers_file_todel = join(self.publishers_dir_todel, "publishers.json") 

33 

34 self.journals_dir_todel = join(self.support_mat, "journals") 

35 self.journals_file_todel = join(self.journals_dir_todel, "journals.json") 

36 

37 self.madeup_data_dir = join(self.support_mat, "made_up_mat") 

38 self.madeup_publishers = join(self.madeup_data_dir, "publishers.json") 

39 self.madeup_journals = join(self.madeup_data_dir,"journals.json") 

40 self.madeup_input = join(self.madeup_data_dir,"input") 

41 self.madeup_iod = join(self.madeup_data_dir,"iod") 

42 

43 self.input_dirt_short= join(self.test_dir,"csv_files_short") 

44 self.input_dirt_iod= join(self.test_dir,"csv_file_iod") 

45 self.input_dirt_sample= join(self.test_dir,"csv_files_sample") 

46 self.input_dirt_compr= join(self.test_dir,"CSV_iCiteMD_zipped.zip") 

47 

48 self.processing_csv_row_base = os.path.join('test', 'openaire_processing') 

49 self._id_orcid_data = os.path.join(self.processing_csv_row_base, 'iod') 

50 self.cache = os.path.join(os.getcwd(), "cache.json") 

51 

52 def test_preprocess_base_decompress_and_read(self): 

53 """Test base functionalities of the OROCI processor for producing META csv tables and INDEX tables: 

54 1) All the files in the TARs in input are correctly processed 

55 2) The number of files in input corresponds to the number of files in output (both for citation and for meta tables) 

56 3) The number of bibliographic entities corresponds to the number of citations in input *2 (citing + cited) 

57 """ 

58 for el in os.listdir(self.sample_2tar): 

59 if el.endswith("decompr_zip_dir"): 

60 shutil.rmtree(os.path.join(self.sample_2tar, el)) 

61 

62 if os.path.exists(self.output_dir): 

63 shutil.rmtree(self.output_dir) 

64 

65 citations_output_path = self.output_dir + "_citations" 

66 if os.path.exists(citations_output_path): 

67 shutil.rmtree(citations_output_path) 

68 preprocess(openaire_json_dir=self.sample_2tar, csv_dir=self.output_dir, publishers_filepath=self.publishers_file, orcid_doi_filepath=self.doi_orcid, cache=self.cache) 

69 

70 citations_in_output = 0 

71 encountered_ids = set() 

72 unique_entities = 0 

73 

74 citations_files_n = len(list(os.listdir(citations_output_path))) 

75 for file in os.listdir(citations_output_path): 

76 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

77 cits_rows = list(csv.DictReader(f)) 

78 citations_in_output += len(cits_rows) 

79 for x in cits_rows: 

80 citing_ids = x["citing"].split(" ") 

81 citied_ids = x["referenced"].split(" ") 

82 if all(id not in encountered_ids for id in citing_ids): 

83 unique_entities += 1 

84 encountered_ids.update(citing_ids) 

85 

86 if all(id not in encountered_ids for id in citied_ids): 

87 unique_entities += 1 

88 encountered_ids.update(citied_ids) 

89 

90 expected_citations_in_output= 2*3*4 

91 # 2 tar * 3 files * 4 citations 

92 

93 expected_entities_in_output= 2*3*4*2 

94 # 2 tar * 3 files * 4 citations * 2 entities 

95 

96 self.assertEqual(expected_entities_in_output, unique_entities) 

97 self.assertEqual(expected_citations_in_output, citations_in_output) 

98 

99 shutil.rmtree(citations_output_path) 

100 

101 meta_files_n = len(list(os.listdir(self.output_dir))) 

102 

103 # Make sure that a meta table row was created for each entity and, thus, that the number of 

104 entities_in_meta_output = 0 

105 for file in os.listdir(self.output_dir): 

106 with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

107 entities_in_meta_output += len(list(csv.DictReader(f))) 

108 

109 self.assertEqual(expected_entities_in_output, entities_in_meta_output) 

110 self.assertEqual(unique_entities, entities_in_meta_output) 

111 

112 # make sure that for each of the input files was created a citation file and a meta input file 

113 self.assertTrue(meta_files_n == citations_files_n == 6) 

114 

115 shutil.rmtree(self.output_dir) 

116 

117 for el in os.listdir(self.sample_2tar): 

118 if el.endswith("decompr_zip_dir"): 

119 shutil.rmtree(os.path.join(self.sample_2tar, el)) 

120 

121 # os.remove(self.any_db) 

122 

123 def test_preprocess_base_decompress_and_read_redis_test(self): 

124 """Test base functionalities of the OROCI processor for producing META csv tables and INDEX tables: 

125 1) All the files in the TARs in input are correctly processed 

126 2) The number of files in input corresponds to the number of files in output (both for citation and for meta tables) 

127 3) The number of bibliographic entities corresponds to the number of citations in input *2 (citing + cited) 

128 """ 

129 for el in os.listdir(self.sample_2tar): 

130 if el.endswith("decompr_zip_dir"): 

131 shutil.rmtree(os.path.join(self.sample_2tar, el)) 

132 

133 if os.path.exists(self.output_dir): 

134 shutil.rmtree(self.output_dir) 

135 

136 citations_output_path = self.output_dir + "_citations" 

137 if os.path.exists(citations_output_path): 

138 shutil.rmtree(citations_output_path) 

139 

140 preprocess(openaire_json_dir=self.sample_2tar, csv_dir=self.output_dir, publishers_filepath=self.publishers_file, orcid_doi_filepath=self.doi_orcid, cache=self.cache) 

141 

142 citations_in_output = 0 

143 encountered_ids = set() 

144 unique_entities = 0 

145 

146 citations_files_n = len(list(os.listdir(citations_output_path))) 

147 for file in os.listdir(citations_output_path): 

148 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

149 cits_rows = list(csv.DictReader(f)) 

150 citations_in_output += len(cits_rows) 

151 for x in cits_rows: 

152 citing_ids = x["citing"].split(" ") 

153 citied_ids = x["referenced"].split(" ") 

154 if all(id not in encountered_ids for id in citing_ids): 

155 unique_entities += 1 

156 encountered_ids.update(citing_ids) 

157 

158 if all(id not in encountered_ids for id in citied_ids): 

159 unique_entities += 1 

160 encountered_ids.update(citied_ids) 

161 

162 expected_citations_in_output= 2*3*4 

163 # 2 tar * 3 files * 4 citations 

164 

165 expected_entities_in_output= 2*3*4*2 

166 # 2 tar * 3 files * 4 citations * 2 entities 

167 

168 self.assertEqual(expected_entities_in_output, unique_entities) 

169 self.assertEqual(expected_citations_in_output, citations_in_output) 

170 

171 shutil.rmtree(citations_output_path) 

172 

173 meta_files_n = len(list(os.listdir(self.output_dir))) 

174 

175 # Make sure that a meta table row was created for each entity and, thus, that the number of 

176 entities_in_meta_output = 0 

177 for file in os.listdir(self.output_dir): 

178 with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

179 entities_in_meta_output += len(list(csv.DictReader(f))) 

180 

181 self.assertEqual(expected_entities_in_output, entities_in_meta_output) 

182 self.assertEqual(unique_entities, entities_in_meta_output) 

183 

184 # make sure that for each of the input files was created a citation file and a meta input file 

185 self.assertTrue(meta_files_n == citations_files_n == 6) 

186 

187 shutil.rmtree(self.output_dir) 

188 

189 for el in os.listdir(self.sample_2tar): 

190 if el.endswith("decompr_zip_dir"): 

191 shutil.rmtree(os.path.join(self.sample_2tar, el)) 

192 

193 # os.remove(self.any_db) 

194 

195 def test_preprocess_duplicates_management(self): 

196 """Test functionalities of the OROCI processor for producing META csv tables and INDEX tables, when multiple 

197 citations with a common id involved are processed. Expected output, given two citations with the same citing 

198 entity: three rows in meta table, two rows in citations tables 

199 1) All the files in the TARs in input are correctly processed 

200 2) The number of files in input corresponds to the number of files in output (both for citation and for meta tables) 

201 3) The number of bibliographic entities corresponds to the number of citations in input *2 (citing + cited) 

202 """ 

203 for el in os.listdir(self.sample_dupl): 

204 if el.endswith("decompr_zip_dir"): 

205 shutil.rmtree(os.path.join(self.sample_dupl, el)) 

206 

207 if os.path.exists(self.output_dir): 

208 shutil.rmtree(self.output_dir) 

209 

210 citations_output_path = self.output_dir + "_citations" 

211 if os.path.exists(citations_output_path): 

212 shutil.rmtree(citations_output_path) 

213 preprocess(openaire_json_dir=self.sample_dupl, csv_dir=self.output_dir, publishers_filepath=self.publishers_file, orcid_doi_filepath=self.doi_orcid, cache=self.cache, max_workers=2) 

214 

215 citations_in_output = 0 

216 encountered_ids = set() 

217 unique_entities = 0 

218 

219 citations_files_n = len(list(os.listdir(citations_output_path))) 

220 for file in os.listdir(citations_output_path): 

221 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

222 cits_rows = list(csv.DictReader(f)) 

223 citations_in_output += len(cits_rows) 

224 for x in cits_rows: 

225 citing_ids = x["citing"].split(" ") 

226 citied_ids = x["referenced"].split(" ") 

227 if all(id not in encountered_ids for id in citing_ids): 

228 unique_entities += 1 

229 encountered_ids.update(citing_ids) 

230 

231 if all(id not in encountered_ids for id in citied_ids): 

232 unique_entities += 1 

233 encountered_ids.update(citied_ids) 

234 

235 expected_citations_in_output= 2 

236 # since the citing entity is the same for both citations 

237 

238 expected_entities_in_output= 3 

239 # since the citing entity is the same for both citations 

240 

241 self.assertEqual(expected_entities_in_output, unique_entities) 

242 self.assertEqual(expected_citations_in_output, citations_in_output) 

243 

244 shutil.rmtree(citations_output_path) 

245 shutil.rmtree(self.output_dir) 

246 

247 for el in os.listdir(self.sample_dupl): 

248 if el.endswith("decompr_zip_dir"): 

249 shutil.rmtree(os.path.join(self.sample_dupl, el)) 

250 if os.path.exists(self.any_db): 

251 os.remove(self.any_db) 

252 

253 def test_preprocess_duplicates_management_redis(self): 

254 """Test functionalities of the OROCI processor for producing META csv tables and INDEX tables, when multiple 

255 citations with a common id involved are processed. Expected output, given two citations with the same citing 

256 entity: three rows in meta table, two rows in citations tables 

257 1) All the files in the TARs in input are correctly processed 

258 2) The number of files in input corresponds to the number of files in output (both for citation and for meta tables) 

259 3) The number of bibliographic entities corresponds to the number of citations in input *2 (citing + cited) 

260 """ 

261 for el in os.listdir(self.sample_dupl): 

262 if el.endswith("decompr_zip_dir"): 

263 shutil.rmtree(os.path.join(self.sample_dupl, el)) 

264 

265 if os.path.exists(self.output_dir): 

266 shutil.rmtree(self.output_dir) 

267 

268 citations_output_path = self.output_dir + "_citations" 

269 if os.path.exists(citations_output_path): 

270 shutil.rmtree(citations_output_path) 

271 

272 preprocess(openaire_json_dir=self.sample_dupl, csv_dir=self.output_dir, publishers_filepath=self.publishers_file, orcid_doi_filepath=self.doi_orcid, cache=self.cache, max_workers=2) 

273 

274 citations_in_output = 0 

275 encountered_ids = set() 

276 unique_entities = 0 

277 

278 citations_files_n = len(list(os.listdir(citations_output_path))) 

279 for file in os.listdir(citations_output_path): 

280 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

281 cits_rows = list(csv.DictReader(f)) 

282 citations_in_output += len(cits_rows) 

283 for x in cits_rows: 

284 citing_ids = x["citing"].split(" ") 

285 citied_ids = x["referenced"].split(" ") 

286 if all(id not in encountered_ids for id in citing_ids): 

287 unique_entities += 1 

288 encountered_ids.update(citing_ids) 

289 

290 if all(id not in encountered_ids for id in citied_ids): 

291 unique_entities += 1 

292 encountered_ids.update(citied_ids) 

293 

294 expected_citations_in_output= 2 

295 # since the citing entity is the same for both citations 

296 

297 expected_entities_in_output= 3 

298 # since the citing entity is the same for both citations 

299 

300 self.assertEqual(expected_entities_in_output, unique_entities) 

301 self.assertEqual(expected_citations_in_output, citations_in_output) 

302 

303 shutil.rmtree(citations_output_path) 

304 shutil.rmtree(self.output_dir) 

305 

306 for el in os.listdir(self.sample_dupl): 

307 if el.endswith("decompr_zip_dir"): 

308 shutil.rmtree(os.path.join(self.sample_dupl, el)) 

309 

310 def test_cache(self): 

311 '''Nothing should be produced in output, since the cache file reports that all the files in input were completed''' 

312 

313 for el in os.listdir(self.sample_2tar): 

314 if el.endswith("decompr_zip_dir"): 

315 shutil.rmtree(os.path.join(self.sample_2tar, el)) 

316 

317 if os.path.exists(self.output_dir): 

318 shutil.rmtree(self.output_dir) 

319 

320 citations_output_path = self.output_dir + "_citations" 

321 if os.path.exists(citations_output_path): 

322 shutil.rmtree(citations_output_path) 

323 with open(self.cache_test1, "w") as write_cache: 

324 processed_files_dict = {'part1.tar': {'test/openaire_process/2_tar_sample/part1_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n3.gz': 'completed', 'test/openaire_process/2_tar_sample/part1_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n4.gz': 'completed', 'test/openaire_process/2_tar_sample/part1_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n2.gz': 'completed'}, 'part0.tar': {'test/openaire_process/2_tar_sample/part0_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n7.gz': 'completed', 'test/openaire_process/2_tar_sample/part0_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n5.gz': 'completed', 'test/openaire_process/2_tar_sample/part0_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n6.gz': 'completed'}} 

325 json.dump(processed_files_dict,write_cache) 

326 

327 preprocess(openaire_json_dir=self.sample_2tar, csv_dir=self.output_dir, publishers_filepath=self.publishers_file, orcid_doi_filepath=self.doi_orcid, cache=self.cache_test1, max_workers=2, target=2) 

328 

329 citations_in_output = 0 

330 encountered_ids = set() 

331 unique_entities = 0 

332 

333 citations_files_n = len(list(os.listdir(citations_output_path))) 

334 for file in os.listdir(citations_output_path): 

335 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

336 cits_rows = list(csv.DictReader(f)) 

337 citations_in_output += len(cits_rows) 

338 for x in cits_rows: 

339 citing_ids = x["citing"].split(" ") 

340 citied_ids = x["referenced"].split(" ") 

341 if all(id not in encountered_ids for id in citing_ids): 

342 unique_entities += 1 

343 encountered_ids.update(citing_ids) 

344 

345 if all(id not in encountered_ids for id in citied_ids): 

346 unique_entities += 1 

347 encountered_ids.update(citied_ids) 

348 

349 expected_citations_in_output= 0 

350 # since the citing entity is the same for both citations 

351 

352 expected_entities_in_output= 0 

353 # since the citing entity is the same for both citations 

354 

355 self.assertEqual(expected_entities_in_output, unique_entities) 

356 self.assertEqual(expected_citations_in_output, citations_in_output) 

357 

358 shutil.rmtree(citations_output_path) 

359 shutil.rmtree(self.output_dir) 

360 

361 for el in os.listdir(self.sample_2tar): 

362 if el.endswith("decompr_zip_dir"): 

363 shutil.rmtree(os.path.join(self.sample_2tar, el)) 

364 

365if __name__ == '__main__': 

366 unittest.main()