Coverage for test / preprocessing_test.py: 97%
87 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arcangelo Massari <arcangelo.massari@unibo.it>
2# SPDX-FileCopyrightText: 2026 Marta Soricetti <marta.soricetti@unibo.it>
3#
4# SPDX-License-Identifier: ISC
6import glob
7import gzip
8import json
9import math
10import os.path
11import shutil
12import tarfile
13import unittest
14from os import makedirs, listdir
15from os.path import exists, join
17import pandas as pd
19from oc_ds_converter.preprocessing.datacite import DatacitePreProcessing
20from oc_ds_converter.preprocessing.nih import NIHPreProcessing
22BASE_DIR = os.path.join("test","preprocess")
23class PreprocessingTest(unittest.TestCase):
24 def setUp(self):
25 self._input_dir_nih = os.path.join(BASE_DIR, "data_nih")
26 self._input_dir_dc = os.path.join(BASE_DIR, "data_datacite")
27 self._output_dir_preprocessing = os.path.join(BASE_DIR, "all_outputs")
28 makedirs(self._output_dir_preprocessing, exist_ok=True)
29 self._output_dir_nih = os.path.join(self._output_dir_preprocessing, "tmp_data_nih")
30 self._input_tar_dc = os.path.join(self._input_dir_dc, "test_datastructure.tar")
31 self._input_tar_dc_self_cit = os.path.join(self._input_dir_dc, "test_datastructure_self_citation.tar")
32 self._output_dir_dc= os.path.join(self._output_dir_preprocessing,"tmp_data_datacite")
33 self._interval = 78
34 self._interval_dc=2
35 self._relation_type_datacite = ["references", "isreferencedby", "cites", "iscitedby"]
38 def tearDown(self):
39 if exists(self._output_dir_preprocessing):
40 shutil.rmtree(self._output_dir_preprocessing, ignore_errors=True)
42 def test_nih_preprocessing(self):
43 self._nih_pp = NIHPreProcessing(self._input_dir_nih, self._output_dir_nih, self._interval)
44 self._nih_pp.split_input()
45 len_lines = 0
46 for file in (self._nih_pp.get_all_files(self._input_dir_nih, self._nih_pp._req_type))[0]:
47 len_lines += len(pd.read_csv(file))
48 number_of_files_produced = len_lines // self._interval
49 if len_lines % self._interval != 0:
50 number_of_files_produced += 1
51 self.assertTrue(len(self._nih_pp.get_all_files(self._output_dir_nih, self._nih_pp._req_type)[0]) > 0)
52 self.assertEqual(len(self._nih_pp.get_all_files(self._output_dir_nih, self._nih_pp._req_type)[0]), number_of_files_produced)
54 def test_dc_preprocessing(self):
55 self._dc_pp = DatacitePreProcessing(self._input_tar_dc, self._output_dir_dc, self._interval_dc)
56 self._dc_pp.split_input()
57 list_of_files = self._dc_pp.get_all_files(self._output_dir_dc,'.json')[0]
58 out_entities_files = [file for file in list_of_files if os.path.basename(file).startswith('jSonFile')]
59 if out_entities_files:
60 all_processed_entities=0
61 for file in out_entities_files:
62 with open(file, encoding="utf8") as f:
63 recover_dict = json.load(f)
64 data_list = recover_dict["data"]
65 all_processed_entities += len(data_list)
66 self.assertEqual(all_processed_entities, 3)
67 self.assertEqual(len(out_entities_files),2)
68 else:
69 self.fail('No entities files found')
71 def test_dc_preprocessing_self_citation(self):
72 self._dc_pp = DatacitePreProcessing(self._input_tar_dc_self_cit, self._output_dir_dc, self._interval_dc)
73 self._dc_pp.split_input()
74 list_of_files = self._dc_pp.get_all_files(self._output_dir_dc, '.json')[0]
75 out_entities_files = [file for file in list_of_files if os.path.basename(file).startswith('jSonFile')]
76 if out_entities_files:
77 all_processed_entities = 0
78 for file in out_entities_files:
79 with open(file, encoding="utf8") as f:
80 recover_dict = json.load(f)
81 data_list = recover_dict["data"]
82 all_processed_entities += len(data_list)
83 self.assertEqual(all_processed_entities, 2)
84 self.assertEqual(len(out_entities_files), 1)
85 else:
86 self.fail('No entities files found')
88 def test_dc_preprocessing_interrupt_resume(self):
89 # Verify partial: checkpoint exists, minimal/no output files
91 state_file = os.path.join(self._output_dir_preprocessing, "processing_state.json")
92 with open(state_file, 'w') as f:
93 json.dump({
94 "processed_files": ["dois/updated_2023_07/part_0079.jsonl.gz"],
95 "count": 2
96 }, f)
98 self.assertTrue(os.path.exists(state_file))
99 with open(state_file) as f:
100 state = json.load(f)
101 self.assertEqual(state['count'], 2)
103 # 2. Resume & finish
104 resume_pp = DatacitePreProcessing(self._input_tar_dc, self._output_dir_dc, self._interval_dc, state_file)
105 resume_pp.split_input()
107 # 3. Full check
108 files_final = [f for f in resume_pp.get_all_files(self._output_dir_dc, '.json')[0]
109 if 'jSonFile' in os.path.basename(f)]
111 all_entities = sum(len(json.load(open(f))['data']) for f in files_final)
112 self.assertEqual(len(files_final), 1)
113 self.assertEqual(all_entities, 1)
116if __name__ == '__main__':
117 unittest.main()