Coverage for test / pubmed_process_test.py: 99%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023-2024 Arianna Moretti <arianna.moretti4@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

3# SPDX-FileCopyrightText: 2026 Marta Soricetti <marta.soricetti@unibo.it> 

4# 

5# SPDX-License-Identifier: ISC 

6 

7 

8import os.path 

9import shutil 

10import unittest 

11from os.path import join 

12 

13from oc_ds_converter.run.pubmed_process_new import * 

14 

15 

16class PubMedProcess(unittest.TestCase): 

17 maxDiff = None 

18 def setUp(self): 

19 self.test_dir = join("test", "pubmed_process") 

20 self.output_dir = join(self.test_dir, "tmp") 

21 self.support_mat = join(self.test_dir, "support_mat") 

22 self.doi_orcid = join("test", "pubmed_processing", "iod") 

23 

24 self.publishers_file = join(self.support_mat, "publishers.json") 

25 self.journals_file = join(self.support_mat, "journals.json") 

26 

27 self.publishers_dir_todel = join(self.support_mat, "publishers") 

28 self.publishers_file_todel = join(self.publishers_dir_todel, "publishers.json") 

29 

30 self.journals_dir_todel = join(self.support_mat, "journals") 

31 self.journals_file_todel = join(self.journals_dir_todel, "journals.json") 

32 

33 self.madeup_data_dir = join(self.support_mat, "made_up_mat") 

34 self.madeup_publishers = join(self.madeup_data_dir, "publishers.json") 

35 self.madeup_journals = join(self.madeup_data_dir,"journals.json") 

36 self.madeup_input = join(self.madeup_data_dir,"input") 

37 self.madeup_iod = join(self.madeup_data_dir,"iod") 

38 

39 self.input_dirt_short= join(self.test_dir,"csv_files_short") 

40 self.input_dirt_iod= join(self.test_dir,"csv_file_iod") 

41 self.input_dirt_sample= join(self.test_dir,"csv_files_sample") 

42 self.input_dirt_compr= join(self.test_dir,"CSV_iCiteMD_zipped.zip") 

43 

44 self.processing_csv_row_base = os.path.join('test', 'pubmed_processing') 

45 self._id_orcid_data = os.path.join(self.processing_csv_row_base, 'iod') 

46 

47 self.cache1 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache1.json') 

48 self.cache2 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache2.json') 

49 self.cache3 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache3.json') 

50 self.cache4 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache4.json') 

51 self.cache5 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache5.json') 

52 self.cache6 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache6.json') 

53 self.cache7 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache7.json') 

54 self.cache8 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache8.json') 

55 self.cache9 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache9.json') 

56 

57 def test_find_missing_chuncks(self): 

58 

59 output = find_missing_chuncks([(7,13)], 7) 

60 expected = ([(0, 6)], 14) 

61 

62 self.assertEqual(output, expected) 

63 

64 output = find_missing_chuncks([], 5) 

65 expected = ([], 0) 

66 

67 self.assertEqual(output, expected) 

68 

69 output = find_missing_chuncks([(0,6), (7,13)], 6) 

70 expected = (None) 

71 

72 self.assertEqual(output, expected) 

73 

74 output = find_missing_chuncks([(0,6), (7,13)], 7) 

75 expected = ([], 14) 

76 

77 self.assertEqual(output, expected) 

78 

79 output = find_missing_chuncks([(0,6), (14,20)], 7) 

80 expected = ([(7,13)], 21) 

81 

82 self.assertEqual(output, expected) 

83 

84 output = find_missing_chuncks([(0,6), (21,27)], 7) 

85 expected = ([(7,13), (14, 20)], 28) 

86 

87 self.assertTrue(all(x in output[0] for x in expected[0])) 

88 self.assertTrue(all(x in expected[0] for x in output[0])) 

89 self.assertEqual(output[1], expected[1]) 

90 

91 def test_new_chunks_distribution(self): 

92 n_spare_processes = 5 

93 first_row_to_be_processed = 0 

94 interval = 10 

95 n_total_rows = 50 

96 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows) 

97 expected = [(0, 9), (10, 19), (20, 29), (30, 39), (40, 49) ] 

98 self.assertEqual(result, expected) 

99 

100 n_spare_processes = 6 

101 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows) 

102 expected = [(0, 9), (10, 19), (20, 29), (30, 39), (40, 49)] 

103 self.assertEqual(result, expected) 

104 

105 first_row_to_be_processed = 5 

106 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows) 

107 expected = [(5, 14), (15, 24), (25, 34), (35, 44), (45, 49)] 

108 self.assertEqual(result, expected) 

109 

110 first_row_to_be_processed = 6 

111 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows) 

112 expected = [(6, 15), (16, 25), (26, 35), (36, 45), (46, 49)] 

113 self.assertEqual(result, expected) 

114 

115 first_row_to_be_processed = 6 

116 n_spare_processes = 1 

117 interval = 3 

118 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows) 

119 expected = [(6, 8)] 

120 self.assertEqual(result, expected) 

121 

122 n_spare_processes = 3 

123 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows) 

124 expected = [(6, 8), (9,11), (12,14)] 

125 self.assertEqual(result, expected) 

126 

127 n_total_rows = 0 

128 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows) 

129 expected = [] 

130 self.assertEqual(result, expected) 

131 

132 n_total_rows = 7 

133 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows) 

134 expected = [(6, 6)] 

135 self.assertEqual(result, expected) 

136 

137 n_total_rows = 6 

138 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows) 

139 expected = [] 

140 self.assertEqual(result, expected) 

141 

142 def test_assign_chunks(self): 

143 

144 # CASO 0 : inizio del processo, nessuna iterazione è stata iniziata. 

145 n_processes = 5 

146 interval = 7 

147 n_total_rows = 28 

148 

149 expected = ([(0, 6), (7, 13), (14, 20), (21, 27)], True) 

150 result = assign_chunks(n_processes, interval, n_total_rows, self.cache1, lock=None) 

151 self.assertEqual(result, expected) 

152 

153 # CASO 1: risulta iniziata la seconda iterazione ma non la prima: comportamento anomalo, return None 

154 

155 expected = None 

156 result = assign_chunks(n_processes, interval, n_total_rows, self.cache2, lock=None) 

157 self.assertEqual(result, expected) 

158 

159 # CASO 2: Seconda iterazione iniziata 

160 # CASO 2.1: Seconda iterazione iniziata, nessun chunk saltato 

161 expected = ([(7, 13), (14, 20), (21, 27)], False) 

162 result = assign_chunks(n_processes, interval, n_total_rows, self.cache3, lock=None) 

163 self.assertEqual(result, expected) 

164 

165 # CASO 2.1.1 Seconda iterazione iniziata, nessun chunk saltato, non ci sono più row da processare 

166 expected = ([], False) 

167 result = assign_chunks(n_processes, interval, n_total_rows, self.cache5, lock=None) 

168 self.assertEqual(result, expected) 

169 

170 # CASO 2.1.2 Seconda iterazione iniziata, nessun chunk saltato, ci sono altre row da processare 

171 expected = ([(14, 20), (21, 27)], False) 

172 result = assign_chunks(n_processes, interval, n_total_rows, self.cache6, lock=None) 

173 self.assertEqual(result, expected) 

174 

175 # CASO 2.2: Seconda iterazione iniziata, con chunk saltati 

176 expected = ([(0, 6), (14, 20), (21, 27)], False) 

177 result = assign_chunks(n_processes, interval, n_total_rows, self.cache4, lock=None) 

178 self.assertEqual(result, expected) 

179 

180 

181 # CASO 3: Prima iterazione iniziata 

182 # CASO 3.1: Prima iterazione iniziata, nessun chunk saltato 

183 # CASO 3.1.1 Prima iterazione iniziata, nessun chunk saltato, non ci sono più row da processare 

184 expected = ([], True) 

185 result = assign_chunks(n_processes, interval, n_total_rows, self.cache8, lock=None) 

186 self.assertEqual(result, expected) 

187 

188 # CASO 3.1.2 Prima iterazione iniziata, nessun chunk saltato, ci sono altre row da processare 

189 expected = ([(14, 20), (21, 27)], True) 

190 result = assign_chunks(n_processes, interval, n_total_rows, self.cache7, lock=None) 

191 self.assertEqual(result, expected) 

192 

193 # CASO 3.2: Prima iterazione iniziata, con chunk saltati 

194 expected = ([(0, 6), (21, 27)], True) 

195 result = assign_chunks(n_processes, interval, n_total_rows, self.cache9, lock=None) 

196 self.assertEqual(result, expected) 

197 

198 

199 

200 # def test_preprocess_base(self): 

201 # """Test base functionalities of the POCI processor for producing META csv tables""" 

202 # if os.path.exists(self.output_dir): 

203 # shutil.rmtree(self.output_dir) 

204 # preprocess(pubmed_csv_dir=self.input_dirt_sample, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid) 

205 # 

206 # output = dict() 

207 # for file in os.listdir(self.output_dir): 

208 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

209 # output[file] = list(csv.DictReader(f)) 

210 # expected_output= { 

211 # 'CSVFile_1.csv': 

212 # [ 

213 # {'id': 'pmid:118', 

214 # 'title': 'Proceedings: Comparison of the effects of selective alpha and beta-receptor agonists on intracellular cyclic AMP levels and glycogen phosphorylase activity in guinea-pig liver.', 

215 # 'author': 'D Osborn; D H Jenkinson', 

216 # 'pub_date': '1975', 

217 # 'venue': 'British journal of pharmacology [issn:0007-1188]', 

218 # 'volume': '', 

219 # 'issue': '', 

220 # 'page': '', 

221 # 'type': 'journal article', 

222 # 'publisher': '', 

223 # 'editor': ''}, 

224 # {'id': 'pmid:120', 

225 # 'title': 'Proceedings: Do anti-psychotic drugs act by dopamine receptor blockade in the nucleus accumbens.', 

226 # 'author': 'T J Crow; J F Deakin; A Longden', 

227 # 'pub_date': '1975', 

228 # 'venue': 'British journal of pharmacology [issn:0007-1188]', 

229 # 'volume': '', 

230 # 'issue': '', 

231 # 'page': '', 

232 # 'type': 'journal article', 

233 # 'publisher': '', 

234 # 'editor': ''}, 

235 # {'id': 'pmid:351 doi:10.2527/jas1975.4151249x', 

236 # 'title': 'Analyses of rumen fluid from "sudden death", lactic acidotic and healthy cattle fed high concentrate ration.', 

237 # 'author': 'J R Wilson; E E Bartley; H D Anthony; B E Brent; D A Sapienza; T E Chapman; A D Dayton; R J Milleret; R A Frey; R M Meyer', 

238 # 'pub_date': '1975', 

239 # 'venue': 'Journal of animal science [issn:0021-8812]', 

240 # 'volume': '', 

241 # 'issue': '', 

242 # 'page': '', 

243 # 'type': 'journal article', 

244 # 'publisher': 'American Society of Animal Science (ASAS)', 

245 # 'editor': ''}, 

246 # {'id': 'pmid:352 doi:10.2527/jas1975.4151314x', 

247 # 'title': 'Mitochondrial traits of muscle from stress-susceptible pigs.', 

248 # 'author': 'D R Campion; J C Olson; D G Topel; L L Christian; D L Kuhlers', 

249 # 'pub_date': '1975', 

250 # 'venue': 'Journal of animal science [issn:0021-8812]', 

251 # 'volume': '', 

252 # 'issue': '', 

253 # 'page': '', 

254 # 'type': 'journal article', 

255 # 'publisher': 'American Society of Animal Science (ASAS)', 

256 # 'editor': ''}, 

257 # {'id': 'pmid:353 doi:10.1152/jappl.1975.39.4.580', 

258 # 'title': 'Local control of pulmonary resistance and lung compliance in the canine lung.', 

259 # 'author': 'R L Coon; C C Rattenborg; J P Kampine', 

260 # 'pub_date': '1975', 

261 # 'venue': 'Journal of applied physiology [issn:0021-8987]', 

262 # 'volume': '', 

263 # 'issue': '', 

264 # 'page': '', 

265 # 'type': 'journal article', 

266 # 'publisher': 'American Physiological Society', 

267 # 'editor': ''} 

268 # ] 

269 # } 

270 # 

271 # elements_in_output = list() 

272 # for l in output.values(): 

273 # for e in l: 

274 # elements_in_output.append(e) 

275 # 

276 # elements_expected = list() 

277 # for l in expected_output.values(): 

278 # for e in l: 

279 # elements_expected.append(e) 

280 # 

281 # self.assertCountEqual(elements_in_output, elements_expected) 

282 # shutil.rmtree(self.output_dir) 

283 

284 # def test_preprocess_interval_number(self): 

285 # """Test that the processed rows are correctly distributed in output files, with respect to the 

286 # interval number specified in input""" 

287 # 

288 # if os.path.exists(self.output_dir): 

289 # shutil.rmtree(self.output_dir) 

290 # preprocess(pubmed_csv_dir=self.input_dirt_sample, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=2) 

291 # 

292 # output = dict() 

293 # n_files = 0 

294 # for file in os.listdir(self.output_dir): 

295 # n_files += 1 

296 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

297 # output[n_files] = len(list(csv.DictReader(f))) 

298 # 

299 # expected_files = 3 

300 # expected_ents_per_file = 2 

301 # last_file_ents = 1 

302 # n_files_full = len([x for x in output.values() if x == 2]) 

303 # n_files_rem= len([x for x in output.values() if x == 1]) 

304 # self.assertEqual(expected_files, len(output.keys())) 

305 # self.assertEqual(n_files_full, 2) 

306 # self.assertEqual(n_files_rem, 1) 

307 # self.assertTrue(max(output.values())==expected_ents_per_file) 

308 # self.assertTrue(min(output.values())==last_file_ents) 

309 # shutil.rmtree(self.output_dir) 

310 # 

311 # def test_preprocess_save_recovered_publishers(self): 

312 # """Test that data is correctly recovered using API and stored in new support file, if no support material was provided in input""" 

313 # if not os.path.exists(self.publishers_dir_todel): 

314 # os.makedirs(self.publishers_dir_todel) 

315 # if os.path.exists(self.output_dir): 

316 # shutil.rmtree(self.output_dir) 

317 # preprocess(pubmed_csv_dir=self.input_dirt_sample, publishers_filepath=self.publishers_file_todel, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=1) 

318 # #test that the information processed was successfully saved each <interval> number of rows. 

319 # prefixes_encountered = set() 

320 # self.assertTrue(os.path.exists(self.publishers_file_todel)) 

321 # for file in os.listdir(self.output_dir): 

322 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

323 # ent_list = list(csv.DictReader(f)) 

324 # for e in ent_list: 

325 # if e.get("id"): 

326 # doi = [x for x in e.get("id").split(" ") if x.startswith("doi")] 

327 # if doi: 

328 # for d in doi: 

329 # pref = d.split('/')[0] 

330 # if pref: 

331 # lenpref = len("doi:") 

332 # pref = pref[lenpref:] 

333 # prefixes_encountered.add(pref) 

334 # 

335 # with open(self.publishers_file_todel, "r") as dobj: 

336 # pref_pub_dict = json.load(dobj) 

337 # self.assertCountEqual(prefixes_encountered, pref_pub_dict.keys()) 

338 # 

339 # shutil.rmtree(self.output_dir) 

340 # shutil.rmtree(self.publishers_dir_todel) 

341 # 

342 # def test_preprocess_zip_input(self): 

343 # """Test that the processed on zip compressed input""" 

344 # 

345 # if os.path.exists(self.output_dir): 

346 # shutil.rmtree(self.output_dir) 

347 # preprocess(pubmed_csv_dir=self.input_dirt_compr, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=200) 

348 # 

349 # output = dict() 

350 # for file in os.listdir(self.output_dir): 

351 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

352 # output[file] = list(csv.DictReader(f)) 

353 # processed_lines = 0 

354 # for k,v in output.items(): 

355 # processed_lines += len(v) 

356 # self.assertEqual(processed_lines, 29) 

357 # 

358 # todel_path = ".".join(self.input_dirt_compr.split(".")[:-1])+"_decompr_zip_dir" 

359 # shutil.rmtree(todel_path) 

360 # shutil.rmtree(self.output_dir) 

361 # 

362 # 

363 # def test_preprocess_save_recovered_journals(self): 

364 # """Test that data is correctly recovered using API and stored in new support file, if no support material was provided in input""" 

365 # if not os.path.exists(self.journals_dir_todel): 

366 # os.makedirs(self.journals_dir_todel) 

367 # if os.path.exists(self.output_dir): 

368 # shutil.rmtree(self.output_dir) 

369 # preprocess(pubmed_csv_dir=self.input_dirt_sample, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file_todel, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=1) 

370 # #test that the information processed was successfully saved at each <interval> number of rows. 

371 # issns_encountered = set() 

372 # self.assertTrue(os.path.exists(self.journals_file_todel)) 

373 # 

374 # for file in os.listdir(self.output_dir): 

375 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

376 # ent_list = list(csv.DictReader(f)) 

377 # for e in ent_list: 

378 # if e.get("venue"): 

379 # if "[" in e.get("venue") and "]" in e.get("venue"): 

380 # split_venue_ids = e.get("venue").split("[") 

381 # keep_ids=split_venue_ids[1] 

382 # keep_ids = keep_ids.split("]")[0] 

383 # split_issn = keep_ids.split(" ") 

384 # issn = [x for x in split_issn if x.startswith("issn")] 

385 # if issn: 

386 # for i in issn: 

387 # issns_encountered.add(i) 

388 # 

389 # with open(self.journals_file_todel, "r") as dobj: 

390 # jour_issns_dict = json.load(dobj) 

391 # issn_in_map_file = set() 

392 # for k,v in jour_issns_dict.items(): 

393 # for e in v["issn"]: 

394 # issn_in_map_file.add(e) 

395 # 

396 # self.assertCountEqual(issns_encountered, issn_in_map_file) 

397 # 

398 # shutil.rmtree(self.output_dir) 

399 # shutil.rmtree(self.journals_dir_todel) 

400 # 

401 # def test_preprocess_support_data(self): 

402 # """Test that the support material is correctly used, if provided. In particular, fake data is used in this test, in order to check that the information provided in support material is preferred to the use of API, when possible""" 

403 # if os.path.exists(self.output_dir): 

404 # shutil.rmtree(self.output_dir) 

405 # preprocess(pubmed_csv_dir=self.input_dirt_short, publishers_filepath=self.madeup_publishers, journals_filepath= self.madeup_journals, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=1) 

406 # #test that the information processed was successfully saved after each <interval> number of rows. 

407 # for file in os.listdir(self.output_dir): 

408 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

409 # ent_list = list(csv.DictReader(f)) 

410 # for e in ent_list: 

411 # if "pmid:1" in e.get("id").split(" ") and "doi:10.1016/0006-2944(75)90147-7" in e.get("id").split(" "): 

412 # self.assertEqual(e.get("venue"), "Made Up Title 1 [issn:0001-000X]") 

413 # self.assertEqual(e.get("publisher"), "Made Up Publisher") 

414 # if "pmid:324" in e.get("id").split(" ") and "doi:10.1016/0019-2791(75)90174-3" in e.get("id").split(" "): 

415 # self.assertEqual(e.get("venue"), "Made Up Title 2 [issn:0000-0000]") 

416 # self.assertEqual(e.get("publisher"), "Made Up Publisher") 

417 # 

418 # 

419 # shutil.rmtree(self.output_dir) 

420 # 

421 # def test_preprocess_id_orcid_map(self): 

422 # """Test the id-orcid mapping information is correctly used to associate the RA name to its ORCID id, if provided. """ 

423 # if os.path.exists(self.output_dir): 

424 # shutil.rmtree(self.output_dir) 

425 # preprocess(pubmed_csv_dir=self.input_dirt_iod, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=1) 

426 # #test that the information processed was successfully saved each <interval> number of rows. 

427 # for file in os.listdir(self.output_dir): 

428 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

429 # ent_list = list(csv.DictReader(f)) 

430 # for e in ent_list: 

431 # if e.get("id") == "pmid:2 doi:10.1016/0006-291x(75)90482-9": 

432 # self.assertIn("Sarma, R H [orcid:0000-0000-0000-0000]", e.get("author")) 

433 # shutil.rmtree(self.output_dir) 

434 # 

435 # def test_preprocess_id_orcid_map_with_homonyms(self): 

436 # """Test the id-orcid mapping information is correctly used to associate the RA name to its ORCID id, if provided. """ 

437 # 

438 # if os.path.exists(self.output_dir): 

439 # shutil.rmtree(self.output_dir) 

440 # preprocess(pubmed_csv_dir=self.madeup_input, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.madeup_iod, interval=1) 

441 # #test that the information processed was successfully saved at each <interval> number of rows. 

442 # processed_ents = [] 

443 # for file in os.listdir(self.output_dir): 

444 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f: 

445 # ent_list = list(csv.DictReader(f)) 

446 # for e in ent_list: 

447 # processed_ents.append(e) 

448 # ent = processed_ents[0] 

449 # expected_aut_cont = "K S Bose; R H Sarma; Sarma, Harold R. [orcid:0000-0000-0000-0005]; Sarma R. Henry Jack; Sarma, Roy Henry [orcid:0000-0000-0000-0000]" 

450 # self.assertEqual(expected_aut_cont, ent.get("author")) 

451 # 

452 # shutil.rmtree(self.output_dir) 

453 

454if __name__ == '__main__': 

455 unittest.main()