Coverage for test / preprocessing_test.py: 97%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# SPDX-FileCopyrightText: 2026 Marta Soricetti <marta.soricetti@unibo.it> 

3# 

4# SPDX-License-Identifier: ISC 

5 

6import glob 

7import gzip 

8import json 

9import math 

10import os.path 

11import shutil 

12import tarfile 

13import unittest 

14from os import makedirs, listdir 

15from os.path import exists, join 

16 

17import pandas as pd 

18 

19from oc_ds_converter.preprocessing.datacite import DatacitePreProcessing 

20from oc_ds_converter.preprocessing.nih import NIHPreProcessing 

21 

22BASE_DIR = os.path.join("test","preprocess") 

23class PreprocessingTest(unittest.TestCase): 

24 def setUp(self): 

25 self._input_dir_nih = os.path.join(BASE_DIR, "data_nih") 

26 self._input_dir_dc = os.path.join(BASE_DIR, "data_datacite") 

27 self._output_dir_preprocessing = os.path.join(BASE_DIR, "all_outputs") 

28 makedirs(self._output_dir_preprocessing, exist_ok=True) 

29 self._output_dir_nih = os.path.join(self._output_dir_preprocessing, "tmp_data_nih") 

30 self._input_tar_dc = os.path.join(self._input_dir_dc, "test_datastructure.tar") 

31 self._input_tar_dc_self_cit = os.path.join(self._input_dir_dc, "test_datastructure_self_citation.tar") 

32 self._output_dir_dc= os.path.join(self._output_dir_preprocessing,"tmp_data_datacite") 

33 self._interval = 78 

34 self._interval_dc=2 

35 self._relation_type_datacite = ["references", "isreferencedby", "cites", "iscitedby"] 

36 

37 

38 def tearDown(self): 

39 if exists(self._output_dir_preprocessing): 

40 shutil.rmtree(self._output_dir_preprocessing, ignore_errors=True) 

41 

42 def test_nih_preprocessing(self): 

43 self._nih_pp = NIHPreProcessing(self._input_dir_nih, self._output_dir_nih, self._interval) 

44 self._nih_pp.split_input() 

45 len_lines = 0 

46 for file in (self._nih_pp.get_all_files(self._input_dir_nih, self._nih_pp._req_type))[0]: 

47 len_lines += len(pd.read_csv(file)) 

48 number_of_files_produced = len_lines // self._interval 

49 if len_lines % self._interval != 0: 

50 number_of_files_produced += 1 

51 self.assertTrue(len(self._nih_pp.get_all_files(self._output_dir_nih, self._nih_pp._req_type)[0]) > 0) 

52 self.assertEqual(len(self._nih_pp.get_all_files(self._output_dir_nih, self._nih_pp._req_type)[0]), number_of_files_produced) 

53 

54 def test_dc_preprocessing(self): 

55 self._dc_pp = DatacitePreProcessing(self._input_tar_dc, self._output_dir_dc, self._interval_dc) 

56 self._dc_pp.split_input() 

57 list_of_files = self._dc_pp.get_all_files(self._output_dir_dc,'.json')[0] 

58 out_entities_files = [file for file in list_of_files if os.path.basename(file).startswith('jSonFile')] 

59 if out_entities_files: 

60 all_processed_entities=0 

61 for file in out_entities_files: 

62 with open(file, encoding="utf8") as f: 

63 recover_dict = json.load(f) 

64 data_list = recover_dict["data"] 

65 all_processed_entities += len(data_list) 

66 self.assertEqual(all_processed_entities, 3) 

67 self.assertEqual(len(out_entities_files),2) 

68 else: 

69 self.fail('No entities files found') 

70 

71 def test_dc_preprocessing_self_citation(self): 

72 self._dc_pp = DatacitePreProcessing(self._input_tar_dc_self_cit, self._output_dir_dc, self._interval_dc) 

73 self._dc_pp.split_input() 

74 list_of_files = self._dc_pp.get_all_files(self._output_dir_dc, '.json')[0] 

75 out_entities_files = [file for file in list_of_files if os.path.basename(file).startswith('jSonFile')] 

76 if out_entities_files: 

77 all_processed_entities = 0 

78 for file in out_entities_files: 

79 with open(file, encoding="utf8") as f: 

80 recover_dict = json.load(f) 

81 data_list = recover_dict["data"] 

82 all_processed_entities += len(data_list) 

83 self.assertEqual(all_processed_entities, 2) 

84 self.assertEqual(len(out_entities_files), 1) 

85 else: 

86 self.fail('No entities files found') 

87 

88 def test_dc_preprocessing_interrupt_resume(self): 

89 # Verify partial: checkpoint exists, minimal/no output files 

90 

91 state_file = os.path.join(self._output_dir_preprocessing, "processing_state.json") 

92 with open(state_file, 'w') as f: 

93 json.dump({ 

94 "processed_files": ["dois/updated_2023_07/part_0079.jsonl.gz"], 

95 "count": 2 

96 }, f) 

97 

98 self.assertTrue(os.path.exists(state_file)) 

99 with open(state_file) as f: 

100 state = json.load(f) 

101 self.assertEqual(state['count'], 2) 

102 

103 # 2. Resume & finish 

104 resume_pp = DatacitePreProcessing(self._input_tar_dc, self._output_dir_dc, self._interval_dc, state_file) 

105 resume_pp.split_input() 

106 

107 # 3. Full check 

108 files_final = [f for f in resume_pp.get_all_files(self._output_dir_dc, '.json')[0] 

109 if 'jSonFile' in os.path.basename(f)] 

110 

111 all_entities = sum(len(json.load(open(f))['data']) for f in files_final) 

112 self.assertEqual(len(files_final), 1) 

113 self.assertEqual(all_entities, 1) 

114 

115 

116if __name__ == '__main__': 

117 unittest.main()