Coverage for test / openaire_process_test.py: 88%
239 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023-2024 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2023-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
3#
4# SPDX-License-Identifier: ISC
6import os.path
7import shutil
8import unittest
9from os.path import join
10from oc_ds_converter.run.openaire_process import *
13class OpenAireProcessTest(unittest.TestCase):
14 maxDiff = None
15 def setUp(self):
16 self.test_dir = join("test", "openaire_process")
17 self.sample_1tar = join(self.test_dir, "1_tar_sample")
18 self.sample_dupl = join(self.test_dir, "duplicates_sample")
19 self.sample_2tar = join(self.test_dir, "2_tar_sample")
20 self.sample_1tar_alt = join(self.test_dir, "1alt_tar_sample")
21 self.output_dir = join(self.test_dir, "tmp")
22 self.support_mat = join(self.test_dir, "support_mat")
23 self.cache_test1 = join(self.support_mat, "cache_1.json")
24 self.doi_orcid = join("test", "openaire_processing", "iod")
26 self.any_db = join("test", "openaire_process", "anydb.db")
28 self.publishers_file = join(self.support_mat, "publishers.json")
29 self.journals_file = join(self.support_mat, "journals.json")
31 self.publishers_dir_todel = join(self.support_mat, "publishers")
32 self.publishers_file_todel = join(self.publishers_dir_todel, "publishers.json")
34 self.journals_dir_todel = join(self.support_mat, "journals")
35 self.journals_file_todel = join(self.journals_dir_todel, "journals.json")
37 self.madeup_data_dir = join(self.support_mat, "made_up_mat")
38 self.madeup_publishers = join(self.madeup_data_dir, "publishers.json")
39 self.madeup_journals = join(self.madeup_data_dir,"journals.json")
40 self.madeup_input = join(self.madeup_data_dir,"input")
41 self.madeup_iod = join(self.madeup_data_dir,"iod")
43 self.input_dirt_short= join(self.test_dir,"csv_files_short")
44 self.input_dirt_iod= join(self.test_dir,"csv_file_iod")
45 self.input_dirt_sample= join(self.test_dir,"csv_files_sample")
46 self.input_dirt_compr= join(self.test_dir,"CSV_iCiteMD_zipped.zip")
48 self.processing_csv_row_base = os.path.join('test', 'openaire_processing')
49 self._id_orcid_data = os.path.join(self.processing_csv_row_base, 'iod')
50 self.cache = os.path.join(os.getcwd(), "cache.json")
52 def test_preprocess_base_decompress_and_read(self):
53 """Test base functionalities of the OROCI processor for producing META csv tables and INDEX tables:
54 1) All the files in the TARs in input are correctly processed
55 2) The number of files in input corresponds to the number of files in output (both for citation and for meta tables)
56 3) The number of bibliographic entities corresponds to the number of citations in input *2 (citing + cited)
57 """
58 for el in os.listdir(self.sample_2tar):
59 if el.endswith("decompr_zip_dir"):
60 shutil.rmtree(os.path.join(self.sample_2tar, el))
62 if os.path.exists(self.output_dir):
63 shutil.rmtree(self.output_dir)
65 citations_output_path = self.output_dir + "_citations"
66 if os.path.exists(citations_output_path):
67 shutil.rmtree(citations_output_path)
68 preprocess(openaire_json_dir=self.sample_2tar, csv_dir=self.output_dir, publishers_filepath=self.publishers_file, orcid_doi_filepath=self.doi_orcid, cache=self.cache)
70 citations_in_output = 0
71 encountered_ids = set()
72 unique_entities = 0
74 citations_files_n = len(list(os.listdir(citations_output_path)))
75 for file in os.listdir(citations_output_path):
76 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
77 cits_rows = list(csv.DictReader(f))
78 citations_in_output += len(cits_rows)
79 for x in cits_rows:
80 citing_ids = x["citing"].split(" ")
81 citied_ids = x["referenced"].split(" ")
82 if all(id not in encountered_ids for id in citing_ids):
83 unique_entities += 1
84 encountered_ids.update(citing_ids)
86 if all(id not in encountered_ids for id in citied_ids):
87 unique_entities += 1
88 encountered_ids.update(citied_ids)
90 expected_citations_in_output= 2*3*4
91 # 2 tar * 3 files * 4 citations
93 expected_entities_in_output= 2*3*4*2
94 # 2 tar * 3 files * 4 citations * 2 entities
96 self.assertEqual(expected_entities_in_output, unique_entities)
97 self.assertEqual(expected_citations_in_output, citations_in_output)
99 shutil.rmtree(citations_output_path)
101 meta_files_n = len(list(os.listdir(self.output_dir)))
103 # Make sure that a meta table row was created for each entity and, thus, that the number of
104 entities_in_meta_output = 0
105 for file in os.listdir(self.output_dir):
106 with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
107 entities_in_meta_output += len(list(csv.DictReader(f)))
109 self.assertEqual(expected_entities_in_output, entities_in_meta_output)
110 self.assertEqual(unique_entities, entities_in_meta_output)
112 # make sure that for each of the input files was created a citation file and a meta input file
113 self.assertTrue(meta_files_n == citations_files_n == 6)
115 shutil.rmtree(self.output_dir)
117 for el in os.listdir(self.sample_2tar):
118 if el.endswith("decompr_zip_dir"):
119 shutil.rmtree(os.path.join(self.sample_2tar, el))
121 # os.remove(self.any_db)
123 def test_preprocess_base_decompress_and_read_redis_test(self):
124 """Test base functionalities of the OROCI processor for producing META csv tables and INDEX tables:
125 1) All the files in the TARs in input are correctly processed
126 2) The number of files in input corresponds to the number of files in output (both for citation and for meta tables)
127 3) The number of bibliographic entities corresponds to the number of citations in input *2 (citing + cited)
128 """
129 for el in os.listdir(self.sample_2tar):
130 if el.endswith("decompr_zip_dir"):
131 shutil.rmtree(os.path.join(self.sample_2tar, el))
133 if os.path.exists(self.output_dir):
134 shutil.rmtree(self.output_dir)
136 citations_output_path = self.output_dir + "_citations"
137 if os.path.exists(citations_output_path):
138 shutil.rmtree(citations_output_path)
140 preprocess(openaire_json_dir=self.sample_2tar, csv_dir=self.output_dir, publishers_filepath=self.publishers_file, orcid_doi_filepath=self.doi_orcid, cache=self.cache)
142 citations_in_output = 0
143 encountered_ids = set()
144 unique_entities = 0
146 citations_files_n = len(list(os.listdir(citations_output_path)))
147 for file in os.listdir(citations_output_path):
148 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
149 cits_rows = list(csv.DictReader(f))
150 citations_in_output += len(cits_rows)
151 for x in cits_rows:
152 citing_ids = x["citing"].split(" ")
153 citied_ids = x["referenced"].split(" ")
154 if all(id not in encountered_ids for id in citing_ids):
155 unique_entities += 1
156 encountered_ids.update(citing_ids)
158 if all(id not in encountered_ids for id in citied_ids):
159 unique_entities += 1
160 encountered_ids.update(citied_ids)
162 expected_citations_in_output= 2*3*4
163 # 2 tar * 3 files * 4 citations
165 expected_entities_in_output= 2*3*4*2
166 # 2 tar * 3 files * 4 citations * 2 entities
168 self.assertEqual(expected_entities_in_output, unique_entities)
169 self.assertEqual(expected_citations_in_output, citations_in_output)
171 shutil.rmtree(citations_output_path)
173 meta_files_n = len(list(os.listdir(self.output_dir)))
175 # Make sure that a meta table row was created for each entity and, thus, that the number of
176 entities_in_meta_output = 0
177 for file in os.listdir(self.output_dir):
178 with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
179 entities_in_meta_output += len(list(csv.DictReader(f)))
181 self.assertEqual(expected_entities_in_output, entities_in_meta_output)
182 self.assertEqual(unique_entities, entities_in_meta_output)
184 # make sure that for each of the input files was created a citation file and a meta input file
185 self.assertTrue(meta_files_n == citations_files_n == 6)
187 shutil.rmtree(self.output_dir)
189 for el in os.listdir(self.sample_2tar):
190 if el.endswith("decompr_zip_dir"):
191 shutil.rmtree(os.path.join(self.sample_2tar, el))
193 # os.remove(self.any_db)
195 def test_preprocess_duplicates_management(self):
196 """Test functionalities of the OROCI processor for producing META csv tables and INDEX tables, when multiple
197 citations with a common id involved are processed. Expected output, given two citations with the same citing
198 entity: three rows in meta table, two rows in citations tables
199 1) All the files in the TARs in input are correctly processed
200 2) The number of files in input corresponds to the number of files in output (both for citation and for meta tables)
201 3) The number of bibliographic entities corresponds to the number of citations in input *2 (citing + cited)
202 """
203 for el in os.listdir(self.sample_dupl):
204 if el.endswith("decompr_zip_dir"):
205 shutil.rmtree(os.path.join(self.sample_dupl, el))
207 if os.path.exists(self.output_dir):
208 shutil.rmtree(self.output_dir)
210 citations_output_path = self.output_dir + "_citations"
211 if os.path.exists(citations_output_path):
212 shutil.rmtree(citations_output_path)
213 preprocess(openaire_json_dir=self.sample_dupl, csv_dir=self.output_dir, publishers_filepath=self.publishers_file, orcid_doi_filepath=self.doi_orcid, cache=self.cache, max_workers=2)
215 citations_in_output = 0
216 encountered_ids = set()
217 unique_entities = 0
219 citations_files_n = len(list(os.listdir(citations_output_path)))
220 for file in os.listdir(citations_output_path):
221 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
222 cits_rows = list(csv.DictReader(f))
223 citations_in_output += len(cits_rows)
224 for x in cits_rows:
225 citing_ids = x["citing"].split(" ")
226 citied_ids = x["referenced"].split(" ")
227 if all(id not in encountered_ids for id in citing_ids):
228 unique_entities += 1
229 encountered_ids.update(citing_ids)
231 if all(id not in encountered_ids for id in citied_ids):
232 unique_entities += 1
233 encountered_ids.update(citied_ids)
235 expected_citations_in_output= 2
236 # since the citing entity is the same for both citations
238 expected_entities_in_output= 3
239 # since the citing entity is the same for both citations
241 self.assertEqual(expected_entities_in_output, unique_entities)
242 self.assertEqual(expected_citations_in_output, citations_in_output)
244 shutil.rmtree(citations_output_path)
245 shutil.rmtree(self.output_dir)
247 for el in os.listdir(self.sample_dupl):
248 if el.endswith("decompr_zip_dir"):
249 shutil.rmtree(os.path.join(self.sample_dupl, el))
250 if os.path.exists(self.any_db):
251 os.remove(self.any_db)
253 def test_preprocess_duplicates_management_redis(self):
254 """Test functionalities of the OROCI processor for producing META csv tables and INDEX tables, when multiple
255 citations with a common id involved are processed. Expected output, given two citations with the same citing
256 entity: three rows in meta table, two rows in citations tables
257 1) All the files in the TARs in input are correctly processed
258 2) The number of files in input corresponds to the number of files in output (both for citation and for meta tables)
259 3) The number of bibliographic entities corresponds to the number of citations in input *2 (citing + cited)
260 """
261 for el in os.listdir(self.sample_dupl):
262 if el.endswith("decompr_zip_dir"):
263 shutil.rmtree(os.path.join(self.sample_dupl, el))
265 if os.path.exists(self.output_dir):
266 shutil.rmtree(self.output_dir)
268 citations_output_path = self.output_dir + "_citations"
269 if os.path.exists(citations_output_path):
270 shutil.rmtree(citations_output_path)
272 preprocess(openaire_json_dir=self.sample_dupl, csv_dir=self.output_dir, publishers_filepath=self.publishers_file, orcid_doi_filepath=self.doi_orcid, cache=self.cache, max_workers=2)
274 citations_in_output = 0
275 encountered_ids = set()
276 unique_entities = 0
278 citations_files_n = len(list(os.listdir(citations_output_path)))
279 for file in os.listdir(citations_output_path):
280 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
281 cits_rows = list(csv.DictReader(f))
282 citations_in_output += len(cits_rows)
283 for x in cits_rows:
284 citing_ids = x["citing"].split(" ")
285 citied_ids = x["referenced"].split(" ")
286 if all(id not in encountered_ids for id in citing_ids):
287 unique_entities += 1
288 encountered_ids.update(citing_ids)
290 if all(id not in encountered_ids for id in citied_ids):
291 unique_entities += 1
292 encountered_ids.update(citied_ids)
294 expected_citations_in_output= 2
295 # since the citing entity is the same for both citations
297 expected_entities_in_output= 3
298 # since the citing entity is the same for both citations
300 self.assertEqual(expected_entities_in_output, unique_entities)
301 self.assertEqual(expected_citations_in_output, citations_in_output)
303 shutil.rmtree(citations_output_path)
304 shutil.rmtree(self.output_dir)
306 for el in os.listdir(self.sample_dupl):
307 if el.endswith("decompr_zip_dir"):
308 shutil.rmtree(os.path.join(self.sample_dupl, el))
310 def test_cache(self):
311 '''Nothing should be produced in output, since the cache file reports that all the files in input were completed'''
313 for el in os.listdir(self.sample_2tar):
314 if el.endswith("decompr_zip_dir"):
315 shutil.rmtree(os.path.join(self.sample_2tar, el))
317 if os.path.exists(self.output_dir):
318 shutil.rmtree(self.output_dir)
320 citations_output_path = self.output_dir + "_citations"
321 if os.path.exists(citations_output_path):
322 shutil.rmtree(citations_output_path)
323 with open(self.cache_test1, "w") as write_cache:
324 processed_files_dict = {'part1.tar': {'test/openaire_process/2_tar_sample/part1_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n3.gz': 'completed', 'test/openaire_process/2_tar_sample/part1_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n4.gz': 'completed', 'test/openaire_process/2_tar_sample/part1_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n2.gz': 'completed'}, 'part0.tar': {'test/openaire_process/2_tar_sample/part0_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n7.gz': 'completed', 'test/openaire_process/2_tar_sample/part0_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n5.gz': 'completed', 'test/openaire_process/2_tar_sample/part0_decompr_zip_dir/Volumes/T7_Touch/LAVORO/OROCI/SAMPLE_DATI_OROCI_tmp/reduced_n6.gz': 'completed'}}
325 json.dump(processed_files_dict,write_cache)
327 preprocess(openaire_json_dir=self.sample_2tar, csv_dir=self.output_dir, publishers_filepath=self.publishers_file, orcid_doi_filepath=self.doi_orcid, cache=self.cache_test1, max_workers=2, target=2)
329 citations_in_output = 0
330 encountered_ids = set()
331 unique_entities = 0
333 citations_files_n = len(list(os.listdir(citations_output_path)))
334 for file in os.listdir(citations_output_path):
335 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
336 cits_rows = list(csv.DictReader(f))
337 citations_in_output += len(cits_rows)
338 for x in cits_rows:
339 citing_ids = x["citing"].split(" ")
340 citied_ids = x["referenced"].split(" ")
341 if all(id not in encountered_ids for id in citing_ids):
342 unique_entities += 1
343 encountered_ids.update(citing_ids)
345 if all(id not in encountered_ids for id in citied_ids):
346 unique_entities += 1
347 encountered_ids.update(citied_ids)
349 expected_citations_in_output= 0
350 # since the citing entity is the same for both citations
352 expected_entities_in_output= 0
353 # since the citing entity is the same for both citations
355 self.assertEqual(expected_entities_in_output, unique_entities)
356 self.assertEqual(expected_citations_in_output, citations_in_output)
358 shutil.rmtree(citations_output_path)
359 shutil.rmtree(self.output_dir)
361 for el in os.listdir(self.sample_2tar):
362 if el.endswith("decompr_zip_dir"):
363 shutil.rmtree(os.path.join(self.sample_2tar, el))
365if __name__ == '__main__':
366 unittest.main()