Coverage for test / datacite_process_test.py: 71%
213 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2024 Arianna Moretti <arianna.moretti4@unibo.it>
2# SPDX-FileCopyrightText: 2024-2026 Marta Soricetti <marta.soricetti@unibo.it>
3# SPDX-FileCopyrightText: 2025 Arianna Moretti <arianna.moretti4@unibo.it>
4# SPDX-FileCopyrightText: 2026 Arcangelo Massari <arcangelo.massari@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8import csv
9import json
10import os
11import shutil
12import unittest
14from oc_ds_converter.run.datacite_process import preprocess
17class DataciteProcessTest(unittest.TestCase):
19 def setUp(self) -> None:
20 self.test_dir = os.path.join("test",'datacite_process')
21 self.json_dir = os.path.join(self.test_dir, 'jsonFiles')
22 self.output_dir = os.path.join(self.test_dir, 'output_dir')
23 self.processing_test_dir = os.path.join('test', 'datacite_processing')
24 self.publisher_mapping = os.path.join(self.processing_test_dir, 'publishers.csv')
25 self.wanted_dois = os.path.join(self.processing_test_dir, 'wanted_dois')
26 self.iod = os.path.join(self.processing_test_dir, 'iod')
27 self.cache = os.path.join(self.test_dir, 'cache.json')
28 self.cache_test = os.path.join(self.test_dir, 'cache_test.json')
29 self.db = os.path.join(self.test_dir, 'anydb.db')
30 # percorso input con il file NDJSON malformato
31 self.error_input_folder = os.path.join(self.test_dir, 'sample_dc_error')
32 # percorsi per report errori
33 self.bad_dir = os.path.join(self.output_dir, '_bad')
34 self.citations_output_path = self.output_dir + "_citations"
36 def test_preprocess_base_decompress_and_read(self):
37 """Test base functionalities of the Datacite processor for producing META csv tables and INDEX tables:
38 1) All the files in input dir are correctly processed
39 2) The number of files in input corresponds to the number of files in output for citations
40 3) The number of files in input are duplicated in the output folder for both citing and cited entities
41 """
43 if os.path.exists(self.output_dir):
44 shutil.rmtree(self.output_dir)
46 citations_output_path = self.output_dir + "_citations"
47 if os.path.exists(citations_output_path):
48 shutil.rmtree(citations_output_path)
50 # assicura corretto funzionamento di _bad
51 bad_dir = os.path.join(self.output_dir, '_bad')
52 if os.path.exists(bad_dir):
53 shutil.rmtree(bad_dir)
55 if os.path.exists(self.db):
56 os.remove(self.db)
57 if os.path.exists(self.cache):
58 os.remove(self.cache)
60 preprocess(datacite_json_dir=self.json_dir, publishers_filepath=self.publisher_mapping,
61 orcid_doi_filepath=self.iod, csv_dir=self.output_dir, redis_storage_manager=False,
62 storage_path=self.db, cache=self.cache)
64 citations_in_output = 0
66 for file in os.listdir(citations_output_path):
67 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
68 cits_rows = list(csv.DictReader(f))
69 citations_in_output += len(cits_rows)
71 #one self citation must not be considered
72 expected_citations_in_output = 19
74 #excluding duplicated entities and one invalid doi 10.46979/rbn.v52i4.5546
75 expected_entities_in_output = 22
77 self.assertEqual(expected_citations_in_output, citations_in_output)
79 citations_files_n = len(list(os.listdir(citations_output_path)))
81 shutil.rmtree(citations_output_path)
83 entities_in_meta_output = 0
84 for file in os.listdir(self.output_dir):
85 with open(os.path.join(self.output_dir, file), 'r', encoding='utf-8') as f:
86 entities_in_meta_output += len(list(csv.DictReader(f)))
88 self.assertEqual(expected_entities_in_output, entities_in_meta_output)
90 input_files_n = 2
91 self.assertTrue(citations_files_n == input_files_n)
93 # CLEAN: output, _bad, decompressioni e db
94 if os.path.exists(self.output_dir):
95 shutil.rmtree(self.output_dir)
96 bad_dir = os.path.join(self.output_dir, '_bad')
97 if os.path.exists(bad_dir):
98 shutil.rmtree(bad_dir)
99 if os.path.exists(self.db):
100 os.remove(self.db)
101 if os.path.exists(self.cache):
102 os.remove(self.cache)
104 def test_preprocess_orcid_api_disabled_no_index(self):
105 """
106 With the ORCID API disabled and without a DOI->ORCID index,
107 ORCIDs must not appear in _subject.csv files.
108 """
109 # Pre-clean
110 if os.path.exists(self.output_dir):
111 shutil.rmtree(self.output_dir)
113 citations_output_path = self.output_dir + "_citations"
114 if os.path.exists(citations_output_path):
115 shutil.rmtree(citations_output_path)
117 # assicura corretto funzionamento di _bad
118 bad_dir = os.path.join(self.output_dir, '_bad')
119 if os.path.exists(bad_dir):
120 shutil.rmtree(bad_dir)
122 if os.path.exists(self.db):
123 os.remove(self.db)
124 if os.path.exists(self.cache):
125 os.remove(self.cache)
127 # Run with API disabled and no index
128 preprocess(
129 datacite_json_dir=self.json_dir,
130 publishers_filepath=self.publisher_mapping,
131 orcid_doi_filepath=None,
132 csv_dir=self.output_dir,
133 cache=self.cache,
134 use_orcid_api=False
135 )
137 # Verify: no "[orcid:" in any _subject.csv "author" field
138 found_orcid = False
139 for file in os.listdir(self.output_dir):
140 if file.endswith("_subject.csv"):
141 with open(os.path.join(self.output_dir, file), "r", encoding="utf-8") as f:
142 for row in csv.DictReader(f):
143 if "[orcid:" in (row.get("author", "") or ""):
144 found_orcid = True
145 break
146 if found_orcid:
147 break
149 self.assertFalse(found_orcid)
151 # Post-clean
152 # CLEAN: output, _bad, decompressioni e db
153 if os.path.exists(self.output_dir):
154 shutil.rmtree(self.output_dir)
155 bad_dir = os.path.join(self.output_dir, '_bad')
156 if os.path.exists(bad_dir):
157 shutil.rmtree(bad_dir)
158 if os.path.exists(self.db):
159 os.remove(self.db)
160 if os.path.exists(self.cache):
161 os.remove(self.cache)
164 def test_preprocess_orcid_api_disabled_no_leak(self):
165 """With ORCID API disabled, authors should not contain [orcid:] unless the DOI is in the provided index.
166 Our sample input DOIs with authors having ORCID nameIdentifiers are not covered by the sample index (iod),
167 so no [orcid:] should appear in the subject CSVs."""
169 # Pre-clean
170 if os.path.exists(self.output_dir):
171 shutil.rmtree(self.output_dir)
173 citations_output_path = self.output_dir + "_citations"
174 if os.path.exists(citations_output_path):
175 shutil.rmtree(citations_output_path)
177 # assicura corretto funzionamento di _bad
178 bad_dir = os.path.join(self.output_dir, '_bad')
179 if os.path.exists(bad_dir):
180 shutil.rmtree(bad_dir)
182 if os.path.exists(self.db):
183 os.remove(self.db)
184 if os.path.exists(self.cache):
185 os.remove(self.cache)
187 # Run the process with ORCID API disabled
188 preprocess(
189 datacite_json_dir=self.json_dir,
190 publishers_filepath=self.publisher_mapping,
191 orcid_doi_filepath=self.iod,
192 csv_dir=self.output_dir,
193 cache=self.cache,
194 use_orcid_api=False,
195 )
197 # Scan subject CSVs and ensure authors contain no “[orcid:” token
198 subject_rows = 0
199 orcid_mentions = 0
200 for fname in os.listdir(self.output_dir):
201 if fname.endswith("_subject.csv"):
202 with open(os.path.join(self.output_dir, fname), encoding="utf-8") as f:
203 rdr = csv.DictReader(f)
204 for row in rdr:
205 subject_rows += 1
206 if "[orcid:" in row.get("author", ""):
207 orcid_mentions += 1
209 self.assertGreater(subject_rows, 0)
210 self.assertEqual(orcid_mentions, 0)
212 # Post-clean
213 # CLEAN: output, _bad, decompressioni e db
214 if os.path.exists(self.output_dir):
215 shutil.rmtree(self.output_dir)
216 bad_dir = os.path.join(self.output_dir, '_bad')
217 if os.path.exists(bad_dir):
218 shutil.rmtree(bad_dir)
219 if os.path.exists(self.db):
220 os.remove(self.db)
221 if os.path.exists(self.cache):
222 os.remove(self.cache)
224 def test_any_db_creation_redis_no_testing(self):
226 # Pre-clean
227 if os.path.exists(self.output_dir):
228 shutil.rmtree(self.output_dir)
230 citations_output_path = self.output_dir + "_citations"
231 if os.path.exists(citations_output_path):
232 shutil.rmtree(citations_output_path)
234 # assicura corretto funzionamento di _bad
235 bad_dir = os.path.join(self.output_dir, '_bad')
236 if os.path.exists(bad_dir):
237 shutil.rmtree(bad_dir)
239 if os.path.exists(self.db):
240 os.remove(self.db)
241 if os.path.exists(self.cache):
242 os.remove(self.cache)
244 try:
245 rsm = RedisStorageManager(testing=False)
246 rsm.set_value("TEST VALUE", False)
247 run_test = True
248 except:
249 run_test = False
250 print("test skipped: 'test_any_db_creation_redis_no_testing': Connect to redis before running the test")
252 if run_test:
253 rsm.del_value("TEST VALUE")
254 if not len(rsm.get_all_keys()):
255 preprocess(datacite_json_dir=self.json_dir, publishers_filepath=self.publisher_mapping,
256 orcid_doi_filepath=self.iod, csv_dir=self.output_dir, redis_storage_manager=True,
257 storage_path=self.db, cache=self.cache)
259 rsm.delete_storage()
261 else:
263 print("test skipped: 'test_storage_management_no_testing' because redis db 2 is not empty")
265 # Post-clean
266 # CLEAN: output, _bad, decompressioni e db
267 if os.path.exists(self.output_dir):
268 shutil.rmtree(self.output_dir)
269 bad_dir = os.path.join(self.output_dir, '_bad')
270 if os.path.exists(bad_dir):
271 shutil.rmtree(bad_dir)
272 if os.path.exists(self.db):
273 os.remove(self.db)
274 if os.path.exists(self.cache):
275 os.remove(self.cache)
277 def test_cache(self):
278 'Nothing should be produced in output, since the cache file reports that all the files in input were completed'
280 # Pre-clean
281 if os.path.exists(self.output_dir):
282 shutil.rmtree(self.output_dir)
284 citations_output_path = self.output_dir + "_citations"
285 if os.path.exists(citations_output_path):
286 shutil.rmtree(citations_output_path)
288 # assicura corretto funzionamento di _bad
289 bad_dir = os.path.join(self.output_dir, '_bad')
290 if os.path.exists(bad_dir):
291 shutil.rmtree(bad_dir)
293 if os.path.exists(self.db):
294 os.remove(self.db)
295 if os.path.exists(self.cache):
296 os.remove(self.cache)
298 with open(self.cache_test, "w", encoding="utf-8") as write_cache:
299 processed_files_dict = {'first_iteration': ['jSonFile_1', 'jSonFile_2'],
300 'second_iteration': ['jSonFile_1', 'jSonFile_2']}
301 json.dump(processed_files_dict, write_cache)
303 preprocess(datacite_json_dir=self.json_dir, publishers_filepath=self.publisher_mapping,
304 orcid_doi_filepath=self.iod, csv_dir=self.output_dir, redis_storage_manager=False,
305 storage_path=self.db, cache=self.cache_test)
307 citations_in_output = 0
308 encountered_ids = set()
309 unique_entities = 0
311 for file in os.listdir(citations_output_path):
312 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
313 cits_rows = list(csv.DictReader(f))
314 citations_in_output += len(cits_rows)
315 for x in cits_rows:
316 citing_ids = x["citing"].split(" ")
317 citied_ids = x["cited"].split(" ")
318 if all(id not in encountered_ids for id in citing_ids):
319 unique_entities += 1
320 encountered_ids.update(citing_ids)
322 if all(id not in encountered_ids for id in citied_ids):
323 unique_entities += 1
324 encountered_ids.update(citied_ids)
326 expected_citations_in_output = 0
328 expected_entities_in_output = 0
330 self.assertEqual(expected_entities_in_output, unique_entities)
331 self.assertEqual(expected_citations_in_output, citations_in_output)
333 # Post-clean
334 # CLEAN: output, _bad, decompressioni e db
335 if os.path.exists(self.output_dir):
336 shutil.rmtree(self.output_dir)
337 bad_dir = os.path.join(self.output_dir, '_bad')
338 if os.path.exists(bad_dir):
339 shutil.rmtree(bad_dir)
340 if os.path.exists(self.db):
341 os.remove(self.db)
342 if os.path.exists(self.cache):
343 os.remove(self.cache)
345if __name__ == '__main__':
346 unittest.main()