Coverage for test / pubmed_process_test.py: 99%
134 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023-2024 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3# SPDX-FileCopyrightText: 2026 Marta Soricetti <marta.soricetti@unibo.it>
4#
5# SPDX-License-Identifier: ISC
8import os.path
9import shutil
10import unittest
11from os.path import join
13from oc_ds_converter.run.pubmed_process_new import *
16class PubMedProcess(unittest.TestCase):
17 maxDiff = None
18 def setUp(self):
19 self.test_dir = join("test", "pubmed_process")
20 self.output_dir = join(self.test_dir, "tmp")
21 self.support_mat = join(self.test_dir, "support_mat")
22 self.doi_orcid = join("test", "pubmed_processing", "iod")
24 self.publishers_file = join(self.support_mat, "publishers.json")
25 self.journals_file = join(self.support_mat, "journals.json")
27 self.publishers_dir_todel = join(self.support_mat, "publishers")
28 self.publishers_file_todel = join(self.publishers_dir_todel, "publishers.json")
30 self.journals_dir_todel = join(self.support_mat, "journals")
31 self.journals_file_todel = join(self.journals_dir_todel, "journals.json")
33 self.madeup_data_dir = join(self.support_mat, "made_up_mat")
34 self.madeup_publishers = join(self.madeup_data_dir, "publishers.json")
35 self.madeup_journals = join(self.madeup_data_dir,"journals.json")
36 self.madeup_input = join(self.madeup_data_dir,"input")
37 self.madeup_iod = join(self.madeup_data_dir,"iod")
39 self.input_dirt_short= join(self.test_dir,"csv_files_short")
40 self.input_dirt_iod= join(self.test_dir,"csv_file_iod")
41 self.input_dirt_sample= join(self.test_dir,"csv_files_sample")
42 self.input_dirt_compr= join(self.test_dir,"CSV_iCiteMD_zipped.zip")
44 self.processing_csv_row_base = os.path.join('test', 'pubmed_processing')
45 self._id_orcid_data = os.path.join(self.processing_csv_row_base, 'iod')
47 self.cache1 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache1.json')
48 self.cache2 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache2.json')
49 self.cache3 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache3.json')
50 self.cache4 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache4.json')
51 self.cache5 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache5.json')
52 self.cache6 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache6.json')
53 self.cache7 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache7.json')
54 self.cache8 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache8.json')
55 self.cache9 = os.path.join('test', 'pubmed_process', 'support_mat', 'cache9.json')
57 def test_find_missing_chuncks(self):
59 output = find_missing_chuncks([(7,13)], 7)
60 expected = ([(0, 6)], 14)
62 self.assertEqual(output, expected)
64 output = find_missing_chuncks([], 5)
65 expected = ([], 0)
67 self.assertEqual(output, expected)
69 output = find_missing_chuncks([(0,6), (7,13)], 6)
70 expected = (None)
72 self.assertEqual(output, expected)
74 output = find_missing_chuncks([(0,6), (7,13)], 7)
75 expected = ([], 14)
77 self.assertEqual(output, expected)
79 output = find_missing_chuncks([(0,6), (14,20)], 7)
80 expected = ([(7,13)], 21)
82 self.assertEqual(output, expected)
84 output = find_missing_chuncks([(0,6), (21,27)], 7)
85 expected = ([(7,13), (14, 20)], 28)
87 self.assertTrue(all(x in output[0] for x in expected[0]))
88 self.assertTrue(all(x in expected[0] for x in output[0]))
89 self.assertEqual(output[1], expected[1])
91 def test_new_chunks_distribution(self):
92 n_spare_processes = 5
93 first_row_to_be_processed = 0
94 interval = 10
95 n_total_rows = 50
96 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows)
97 expected = [(0, 9), (10, 19), (20, 29), (30, 39), (40, 49) ]
98 self.assertEqual(result, expected)
100 n_spare_processes = 6
101 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows)
102 expected = [(0, 9), (10, 19), (20, 29), (30, 39), (40, 49)]
103 self.assertEqual(result, expected)
105 first_row_to_be_processed = 5
106 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows)
107 expected = [(5, 14), (15, 24), (25, 34), (35, 44), (45, 49)]
108 self.assertEqual(result, expected)
110 first_row_to_be_processed = 6
111 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows)
112 expected = [(6, 15), (16, 25), (26, 35), (36, 45), (46, 49)]
113 self.assertEqual(result, expected)
115 first_row_to_be_processed = 6
116 n_spare_processes = 1
117 interval = 3
118 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows)
119 expected = [(6, 8)]
120 self.assertEqual(result, expected)
122 n_spare_processes = 3
123 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows)
124 expected = [(6, 8), (9,11), (12,14)]
125 self.assertEqual(result, expected)
127 n_total_rows = 0
128 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows)
129 expected = []
130 self.assertEqual(result, expected)
132 n_total_rows = 7
133 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows)
134 expected = [(6, 6)]
135 self.assertEqual(result, expected)
137 n_total_rows = 6
138 result = new_chunks_distribution(n_spare_processes, first_row_to_be_processed, interval, n_total_rows)
139 expected = []
140 self.assertEqual(result, expected)
142 def test_assign_chunks(self):
144 # CASO 0 : inizio del processo, nessuna iterazione è stata iniziata.
145 n_processes = 5
146 interval = 7
147 n_total_rows = 28
149 expected = ([(0, 6), (7, 13), (14, 20), (21, 27)], True)
150 result = assign_chunks(n_processes, interval, n_total_rows, self.cache1, lock=None)
151 self.assertEqual(result, expected)
153 # CASO 1: risulta iniziata la seconda iterazione ma non la prima: comportamento anomalo, return None
155 expected = None
156 result = assign_chunks(n_processes, interval, n_total_rows, self.cache2, lock=None)
157 self.assertEqual(result, expected)
159 # CASO 2: Seconda iterazione iniziata
160 # CASO 2.1: Seconda iterazione iniziata, nessun chunk saltato
161 expected = ([(7, 13), (14, 20), (21, 27)], False)
162 result = assign_chunks(n_processes, interval, n_total_rows, self.cache3, lock=None)
163 self.assertEqual(result, expected)
165 # CASO 2.1.1 Seconda iterazione iniziata, nessun chunk saltato, non ci sono più row da processare
166 expected = ([], False)
167 result = assign_chunks(n_processes, interval, n_total_rows, self.cache5, lock=None)
168 self.assertEqual(result, expected)
170 # CASO 2.1.2 Seconda iterazione iniziata, nessun chunk saltato, ci sono altre row da processare
171 expected = ([(14, 20), (21, 27)], False)
172 result = assign_chunks(n_processes, interval, n_total_rows, self.cache6, lock=None)
173 self.assertEqual(result, expected)
175 # CASO 2.2: Seconda iterazione iniziata, con chunk saltati
176 expected = ([(0, 6), (14, 20), (21, 27)], False)
177 result = assign_chunks(n_processes, interval, n_total_rows, self.cache4, lock=None)
178 self.assertEqual(result, expected)
181 # CASO 3: Prima iterazione iniziata
182 # CASO 3.1: Prima iterazione iniziata, nessun chunk saltato
183 # CASO 3.1.1 Prima iterazione iniziata, nessun chunk saltato, non ci sono più row da processare
184 expected = ([], True)
185 result = assign_chunks(n_processes, interval, n_total_rows, self.cache8, lock=None)
186 self.assertEqual(result, expected)
188 # CASO 3.1.2 Prima iterazione iniziata, nessun chunk saltato, ci sono altre row da processare
189 expected = ([(14, 20), (21, 27)], True)
190 result = assign_chunks(n_processes, interval, n_total_rows, self.cache7, lock=None)
191 self.assertEqual(result, expected)
193 # CASO 3.2: Prima iterazione iniziata, con chunk saltati
194 expected = ([(0, 6), (21, 27)], True)
195 result = assign_chunks(n_processes, interval, n_total_rows, self.cache9, lock=None)
196 self.assertEqual(result, expected)
200 # def test_preprocess_base(self):
201 # """Test base functionalities of the POCI processor for producing META csv tables"""
202 # if os.path.exists(self.output_dir):
203 # shutil.rmtree(self.output_dir)
204 # preprocess(pubmed_csv_dir=self.input_dirt_sample, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid)
205 #
206 # output = dict()
207 # for file in os.listdir(self.output_dir):
208 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
209 # output[file] = list(csv.DictReader(f))
210 # expected_output= {
211 # 'CSVFile_1.csv':
212 # [
213 # {'id': 'pmid:118',
214 # 'title': 'Proceedings: Comparison of the effects of selective alpha and beta-receptor agonists on intracellular cyclic AMP levels and glycogen phosphorylase activity in guinea-pig liver.',
215 # 'author': 'D Osborn; D H Jenkinson',
216 # 'pub_date': '1975',
217 # 'venue': 'British journal of pharmacology [issn:0007-1188]',
218 # 'volume': '',
219 # 'issue': '',
220 # 'page': '',
221 # 'type': 'journal article',
222 # 'publisher': '',
223 # 'editor': ''},
224 # {'id': 'pmid:120',
225 # 'title': 'Proceedings: Do anti-psychotic drugs act by dopamine receptor blockade in the nucleus accumbens.',
226 # 'author': 'T J Crow; J F Deakin; A Longden',
227 # 'pub_date': '1975',
228 # 'venue': 'British journal of pharmacology [issn:0007-1188]',
229 # 'volume': '',
230 # 'issue': '',
231 # 'page': '',
232 # 'type': 'journal article',
233 # 'publisher': '',
234 # 'editor': ''},
235 # {'id': 'pmid:351 doi:10.2527/jas1975.4151249x',
236 # 'title': 'Analyses of rumen fluid from "sudden death", lactic acidotic and healthy cattle fed high concentrate ration.',
237 # 'author': 'J R Wilson; E E Bartley; H D Anthony; B E Brent; D A Sapienza; T E Chapman; A D Dayton; R J Milleret; R A Frey; R M Meyer',
238 # 'pub_date': '1975',
239 # 'venue': 'Journal of animal science [issn:0021-8812]',
240 # 'volume': '',
241 # 'issue': '',
242 # 'page': '',
243 # 'type': 'journal article',
244 # 'publisher': 'American Society of Animal Science (ASAS)',
245 # 'editor': ''},
246 # {'id': 'pmid:352 doi:10.2527/jas1975.4151314x',
247 # 'title': 'Mitochondrial traits of muscle from stress-susceptible pigs.',
248 # 'author': 'D R Campion; J C Olson; D G Topel; L L Christian; D L Kuhlers',
249 # 'pub_date': '1975',
250 # 'venue': 'Journal of animal science [issn:0021-8812]',
251 # 'volume': '',
252 # 'issue': '',
253 # 'page': '',
254 # 'type': 'journal article',
255 # 'publisher': 'American Society of Animal Science (ASAS)',
256 # 'editor': ''},
257 # {'id': 'pmid:353 doi:10.1152/jappl.1975.39.4.580',
258 # 'title': 'Local control of pulmonary resistance and lung compliance in the canine lung.',
259 # 'author': 'R L Coon; C C Rattenborg; J P Kampine',
260 # 'pub_date': '1975',
261 # 'venue': 'Journal of applied physiology [issn:0021-8987]',
262 # 'volume': '',
263 # 'issue': '',
264 # 'page': '',
265 # 'type': 'journal article',
266 # 'publisher': 'American Physiological Society',
267 # 'editor': ''}
268 # ]
269 # }
270 #
271 # elements_in_output = list()
272 # for l in output.values():
273 # for e in l:
274 # elements_in_output.append(e)
275 #
276 # elements_expected = list()
277 # for l in expected_output.values():
278 # for e in l:
279 # elements_expected.append(e)
280 #
281 # self.assertCountEqual(elements_in_output, elements_expected)
282 # shutil.rmtree(self.output_dir)
284 # def test_preprocess_interval_number(self):
285 # """Test that the processed rows are correctly distributed in output files, with respect to the
286 # interval number specified in input"""
287 #
288 # if os.path.exists(self.output_dir):
289 # shutil.rmtree(self.output_dir)
290 # preprocess(pubmed_csv_dir=self.input_dirt_sample, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=2)
291 #
292 # output = dict()
293 # n_files = 0
294 # for file in os.listdir(self.output_dir):
295 # n_files += 1
296 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
297 # output[n_files] = len(list(csv.DictReader(f)))
298 #
299 # expected_files = 3
300 # expected_ents_per_file = 2
301 # last_file_ents = 1
302 # n_files_full = len([x for x in output.values() if x == 2])
303 # n_files_rem= len([x for x in output.values() if x == 1])
304 # self.assertEqual(expected_files, len(output.keys()))
305 # self.assertEqual(n_files_full, 2)
306 # self.assertEqual(n_files_rem, 1)
307 # self.assertTrue(max(output.values())==expected_ents_per_file)
308 # self.assertTrue(min(output.values())==last_file_ents)
309 # shutil.rmtree(self.output_dir)
310 #
311 # def test_preprocess_save_recovered_publishers(self):
312 # """Test that data is correctly recovered using API and stored in new support file, if no support material was provided in input"""
313 # if not os.path.exists(self.publishers_dir_todel):
314 # os.makedirs(self.publishers_dir_todel)
315 # if os.path.exists(self.output_dir):
316 # shutil.rmtree(self.output_dir)
317 # preprocess(pubmed_csv_dir=self.input_dirt_sample, publishers_filepath=self.publishers_file_todel, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=1)
318 # #test that the information processed was successfully saved each <interval> number of rows.
319 # prefixes_encountered = set()
320 # self.assertTrue(os.path.exists(self.publishers_file_todel))
321 # for file in os.listdir(self.output_dir):
322 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
323 # ent_list = list(csv.DictReader(f))
324 # for e in ent_list:
325 # if e.get("id"):
326 # doi = [x for x in e.get("id").split(" ") if x.startswith("doi")]
327 # if doi:
328 # for d in doi:
329 # pref = d.split('/')[0]
330 # if pref:
331 # lenpref = len("doi:")
332 # pref = pref[lenpref:]
333 # prefixes_encountered.add(pref)
334 #
335 # with open(self.publishers_file_todel, "r") as dobj:
336 # pref_pub_dict = json.load(dobj)
337 # self.assertCountEqual(prefixes_encountered, pref_pub_dict.keys())
338 #
339 # shutil.rmtree(self.output_dir)
340 # shutil.rmtree(self.publishers_dir_todel)
341 #
342 # def test_preprocess_zip_input(self):
343 # """Test that the processed on zip compressed input"""
344 #
345 # if os.path.exists(self.output_dir):
346 # shutil.rmtree(self.output_dir)
347 # preprocess(pubmed_csv_dir=self.input_dirt_compr, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=200)
348 #
349 # output = dict()
350 # for file in os.listdir(self.output_dir):
351 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
352 # output[file] = list(csv.DictReader(f))
353 # processed_lines = 0
354 # for k,v in output.items():
355 # processed_lines += len(v)
356 # self.assertEqual(processed_lines, 29)
357 #
358 # todel_path = ".".join(self.input_dirt_compr.split(".")[:-1])+"_decompr_zip_dir"
359 # shutil.rmtree(todel_path)
360 # shutil.rmtree(self.output_dir)
361 #
362 #
363 # def test_preprocess_save_recovered_journals(self):
364 # """Test that data is correctly recovered using API and stored in new support file, if no support material was provided in input"""
365 # if not os.path.exists(self.journals_dir_todel):
366 # os.makedirs(self.journals_dir_todel)
367 # if os.path.exists(self.output_dir):
368 # shutil.rmtree(self.output_dir)
369 # preprocess(pubmed_csv_dir=self.input_dirt_sample, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file_todel, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=1)
370 # #test that the information processed was successfully saved at each <interval> number of rows.
371 # issns_encountered = set()
372 # self.assertTrue(os.path.exists(self.journals_file_todel))
373 #
374 # for file in os.listdir(self.output_dir):
375 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
376 # ent_list = list(csv.DictReader(f))
377 # for e in ent_list:
378 # if e.get("venue"):
379 # if "[" in e.get("venue") and "]" in e.get("venue"):
380 # split_venue_ids = e.get("venue").split("[")
381 # keep_ids=split_venue_ids[1]
382 # keep_ids = keep_ids.split("]")[0]
383 # split_issn = keep_ids.split(" ")
384 # issn = [x for x in split_issn if x.startswith("issn")]
385 # if issn:
386 # for i in issn:
387 # issns_encountered.add(i)
388 #
389 # with open(self.journals_file_todel, "r") as dobj:
390 # jour_issns_dict = json.load(dobj)
391 # issn_in_map_file = set()
392 # for k,v in jour_issns_dict.items():
393 # for e in v["issn"]:
394 # issn_in_map_file.add(e)
395 #
396 # self.assertCountEqual(issns_encountered, issn_in_map_file)
397 #
398 # shutil.rmtree(self.output_dir)
399 # shutil.rmtree(self.journals_dir_todel)
400 #
401 # def test_preprocess_support_data(self):
402 # """Test that the support material is correctly used, if provided. In particular, fake data is used in this test, in order to check that the information provided in support material is preferred to the use of API, when possible"""
403 # if os.path.exists(self.output_dir):
404 # shutil.rmtree(self.output_dir)
405 # preprocess(pubmed_csv_dir=self.input_dirt_short, publishers_filepath=self.madeup_publishers, journals_filepath= self.madeup_journals, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=1)
406 # #test that the information processed was successfully saved after each <interval> number of rows.
407 # for file in os.listdir(self.output_dir):
408 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
409 # ent_list = list(csv.DictReader(f))
410 # for e in ent_list:
411 # if "pmid:1" in e.get("id").split(" ") and "doi:10.1016/0006-2944(75)90147-7" in e.get("id").split(" "):
412 # self.assertEqual(e.get("venue"), "Made Up Title 1 [issn:0001-000X]")
413 # self.assertEqual(e.get("publisher"), "Made Up Publisher")
414 # if "pmid:324" in e.get("id").split(" ") and "doi:10.1016/0019-2791(75)90174-3" in e.get("id").split(" "):
415 # self.assertEqual(e.get("venue"), "Made Up Title 2 [issn:0000-0000]")
416 # self.assertEqual(e.get("publisher"), "Made Up Publisher")
417 #
418 #
419 # shutil.rmtree(self.output_dir)
420 #
421 # def test_preprocess_id_orcid_map(self):
422 # """Test the id-orcid mapping information is correctly used to associate the RA name to its ORCID id, if provided. """
423 # if os.path.exists(self.output_dir):
424 # shutil.rmtree(self.output_dir)
425 # preprocess(pubmed_csv_dir=self.input_dirt_iod, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.doi_orcid, interval=1)
426 # #test that the information processed was successfully saved each <interval> number of rows.
427 # for file in os.listdir(self.output_dir):
428 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
429 # ent_list = list(csv.DictReader(f))
430 # for e in ent_list:
431 # if e.get("id") == "pmid:2 doi:10.1016/0006-291x(75)90482-9":
432 # self.assertIn("Sarma, R H [orcid:0000-0000-0000-0000]", e.get("author"))
433 # shutil.rmtree(self.output_dir)
434 #
435 # def test_preprocess_id_orcid_map_with_homonyms(self):
436 # """Test the id-orcid mapping information is correctly used to associate the RA name to its ORCID id, if provided. """
437 #
438 # if os.path.exists(self.output_dir):
439 # shutil.rmtree(self.output_dir)
440 # preprocess(pubmed_csv_dir=self.madeup_input, publishers_filepath=self.publishers_file, journals_filepath= self.journals_file, csv_dir=self.output_dir, orcid_doi_filepath=self.madeup_iod, interval=1)
441 # #test that the information processed was successfully saved at each <interval> number of rows.
442 # processed_ents = []
443 # for file in os.listdir(self.output_dir):
444 # with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
445 # ent_list = list(csv.DictReader(f))
446 # for e in ent_list:
447 # processed_ents.append(e)
448 # ent = processed_ents[0]
449 # expected_aut_cont = "K S Bose; R H Sarma; Sarma, Harold R. [orcid:0000-0000-0000-0005]; Sarma R. Henry Jack; Sarma, Roy Henry [orcid:0000-0000-0000-0000]"
450 # self.assertEqual(expected_aut_cont, ent.get("author"))
451 #
452 # shutil.rmtree(self.output_dir)
454if __name__ == '__main__':
455 unittest.main()