Coverage for test / crossref_process_test.py: 82%
269 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it>
2# SPDX-FileCopyrightText: 2024 Arianna Moretti <arianna.moretti4@unibo.it>
3# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
4# SPDX-FileCopyrightText: 2025 Arianna Moretti <arianna.moretti4@unibo.it>
5#
6# SPDX-License-Identifier: ISC
8import csv
9import json
10import os.path
11import shutil
12import tarfile
13import unittest
14from os.path import basename, join
15from pathlib import Path
17from oc_ds_converter.run.crossref_process import preprocess
20class CrossrefProcessTest(unittest.TestCase):
21 def setUp(self) -> None:
22 self.test_dir = os.path.join('test', 'crossref_processing')
23 self.targz_input_folder = os.path.join(self.test_dir, 'tar_gz_test')
24 self.targz_input = os.path.join(self.targz_input_folder, '40228.tar.gz')
25 self.output = os.path.join(self.test_dir, 'output_dir')
26 self.wanted_dois = os.path.join(self.test_dir, 'wanted_dois')
27 self.iod = os.path.join(self.test_dir, 'iod')
28 self.cache = os.path.join(self.test_dir, 'cache.json')
29 self.db = os.path.join(self.test_dir, 'anydb.db')
30 self.targz_cited_folder = os.path.join(self.test_dir, 'tar_gz_cited_test')
31 self.targz_cited_input = os.path.join(self.targz_cited_folder, '3.json.tar.gz')
32 self.gzip_input = os.path.join(self.test_dir, 'gzip_test')
33 self.sample_fake_dump_dir = os.path.join(self.test_dir, 'tar_gz_wrong_cited_doi')
34 self.sample_fake_dump = os.path.join(self.sample_fake_dump_dir, '1.tar.gz')
35 self.any_db1 = join(self.test_dir, "anydb1.db")
37 def test_preprocess_base_decompress_and_read_without_cited(self):
38 """CASE 1: compressed input without cited entities"""
39 if os.path.exists(self.output):
40 shutil.rmtree(self.output)
42 citations_output_path = self.output + "_citations"
43 if os.path.exists(citations_output_path):
44 shutil.rmtree(citations_output_path)
46 preprocess(self.targz_input, orcid_doi_filepath=self.iod, csv_dir=self.output, cache=self.cache)
48 citations_in_output = 0
49 encountered_ids = set()
50 unique_entities = 0
52 for file in os.listdir(citations_output_path):
53 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
54 cits_rows = list(csv.DictReader(f))
55 citations_in_output += len(cits_rows)
56 for x in cits_rows:
57 citing_ids = x["citing"].split(" ")
58 citied_ids = x["cited"].split(" ")
59 if all(id not in encountered_ids for id in citing_ids):
60 unique_entities += 1
61 encountered_ids.update(citing_ids)
62 if all(id not in encountered_ids for id in citied_ids):
63 unique_entities += 1
64 encountered_ids.update(citied_ids)
65 expected_entities_in_output = 0
66 expected_citations_in_output = 0
67 self.assertEqual(expected_entities_in_output, unique_entities)
68 self.assertEqual(expected_citations_in_output, citations_in_output)
70 shutil.rmtree(self.output)
71 shutil.rmtree(citations_output_path)
73 def test_preprocess_base_and_decompress_with_cited(self):
74 """CASE2: compressed input with cited entities"""
75 if os.path.exists(self.output):
76 shutil.rmtree(self.output)
78 citations_output_path = self.output + "_citations"
79 if os.path.exists(citations_output_path):
80 shutil.rmtree(citations_output_path)
82 preprocess(crossref_json_dir=self.targz_cited_input, orcid_doi_filepath=self.iod, csv_dir=self.output, cache=self.cache)
83 citations_in_output = 0
84 encountered_ids = set()
85 unique_entities = 0
87 for file in os.listdir(citations_output_path):
88 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
89 cits_rows = list(csv.DictReader(f))
90 citations_in_output += len(cits_rows)
91 for x in cits_rows:
92 citing_ids = x["citing"].split(" ")
93 citied_ids = x["cited"].split(" ")
94 if all(id not in encountered_ids for id in citing_ids):
95 unique_entities += 1
96 encountered_ids.update(citing_ids)
97 if all(id not in encountered_ids for id in citied_ids):
98 unique_entities += 1
99 encountered_ids.update(citied_ids)
100 expected_entities_in_output = 17
101 expected_citations_in_output = 16
102 self.assertEqual(expected_entities_in_output, unique_entities)
103 self.assertEqual(expected_citations_in_output, citations_in_output)
105 citations_files_n = len(list(os.listdir(citations_output_path)))
107 shutil.rmtree(citations_output_path)
109 meta_files_n = len(list(os.listdir(self.output)))
111 # Make sure that a meta table row was created for each entity
112 entities_in_meta_output = 0
113 for file in os.listdir(self.output):
114 with open(os.path.join(self.output, file), 'r', encoding='utf-8') as f:
115 entities_in_meta_output += len(list(csv.DictReader(f)))
117 self.assertEqual(expected_entities_in_output, entities_in_meta_output)
118 self.assertEqual(unique_entities, entities_in_meta_output)
120 # make sure that for each of the input files was created a citation file and two meta input file
121 self.assertTrue(meta_files_n == 2)
122 self.assertTrue(citations_files_n == 1)
124 shutil.rmtree(self.output)
125 if os.path.exists(self.db):
126 os.remove(self.db)
128 def test_preprocess_base_and_decompress_with_cited_redis(self):
129 """CASE2: compressed input with cited entities"""
130 if os.path.exists(self.output):
131 shutil.rmtree(self.output)
133 citations_output_path = self.output + "_citations"
134 if os.path.exists(citations_output_path):
135 shutil.rmtree(citations_output_path)
137 preprocess(crossref_json_dir=self.targz_cited_input, orcid_doi_filepath=self.iod, csv_dir=self.output, cache=self.cache)
138 citations_in_output = 0
139 encountered_ids = set()
140 unique_entities = 0
142 for file in os.listdir(citations_output_path):
143 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
144 cits_rows = list(csv.DictReader(f))
145 citations_in_output += len(cits_rows)
146 for x in cits_rows:
147 citing_ids = x["citing"].split(" ")
148 citied_ids = x["cited"].split(" ")
149 if all(id not in encountered_ids for id in citing_ids):
150 unique_entities += 1
151 encountered_ids.update(citing_ids)
152 if all(id not in encountered_ids for id in citied_ids):
153 unique_entities += 1
154 encountered_ids.update(citied_ids)
155 expected_entities_in_output = 17
156 expected_citations_in_output = 16
157 self.assertEqual(expected_entities_in_output, unique_entities)
158 self.assertEqual(expected_citations_in_output, citations_in_output)
160 citations_files_n = len(list(os.listdir(citations_output_path)))
162 shutil.rmtree(citations_output_path)
164 meta_files_n = len(list(os.listdir(self.output)))
166 # Make sure that a meta table row was created for each entity
167 entities_in_meta_output = 0
168 for file in os.listdir(self.output):
169 with open(os.path.join(self.output, file), 'r', encoding='utf-8') as f:
170 entities_in_meta_output += len(list(csv.DictReader(f)))
172 self.assertEqual(expected_entities_in_output, entities_in_meta_output)
173 self.assertEqual(unique_entities, entities_in_meta_output)
175 # make sure that for each of the input files was created a citation file and two meta input file
176 self.assertTrue(meta_files_n == 2)
177 self.assertTrue(citations_files_n == 1)
179 shutil.rmtree(self.output)
180 #os.remove(self.any_db1)
182 def test_preprocess_wrong_doi_cited(self):
184 if os.path.exists(self.output):
185 shutil.rmtree(self.output)
187 citations_output_path = self.output + "_citations"
188 if os.path.exists(citations_output_path):
189 shutil.rmtree(citations_output_path)
191 preprocess(self.sample_fake_dump, orcid_doi_filepath=self.iod, csv_dir=self.output, cache=self.cache)
193 citations_in_output = 0
194 encountered_ids = set()
195 unique_entities = 0
197 for file in os.listdir(citations_output_path):
198 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
199 cits_rows = list(csv.DictReader(f))
200 citations_in_output += len(cits_rows)
201 for x in cits_rows:
202 citing_ids = x["citing"].split(" ")
203 citied_ids = x["cited"].split(" ")
204 if all(id not in encountered_ids for id in citing_ids):
205 unique_entities += 1
206 encountered_ids.update(citing_ids)
207 if all(id not in encountered_ids for id in citied_ids):
208 unique_entities += 1
209 encountered_ids.update(citied_ids)
211 expected_citations_in_output = 15
213 expected_entities_in_output = 16
215 self.assertEqual(expected_entities_in_output, unique_entities)
216 self.assertEqual(expected_citations_in_output, citations_in_output)
218 shutil.rmtree(self.output)
219 shutil.rmtree(citations_output_path)
221 if os.path.exists(self.db):
222 os.remove(self.db)
224 def test_cache(self):
225 'Nothing should be produced in output, since the cache file reports that all the files in input were completed'
227 if os.path.exists(self.output):
228 shutil.rmtree(self.output)
230 citations_output_path = self.output + "_citations"
231 if os.path.exists(citations_output_path):
232 shutil.rmtree(citations_output_path)
233 cache_dict = {'citing': [], 'cited': []}
234 targz_fd = tarfile.open(self.targz_cited_input, "r:gz", encoding="utf-8")
235 for cur_file in targz_fd:
236 if cur_file.name.endswith('.json') and not basename(cur_file.name).startswith("."):
237 cache_dict['citing'].append(Path(cur_file.name).name)
238 cache_dict['cited'].append(Path(cur_file.name).name)
240 with open(self.cache, "w") as write_cache:
241 json.dump(cache_dict, write_cache)
243 preprocess(crossref_json_dir=self.targz_cited_input,
244 orcid_doi_filepath=self.iod, csv_dir=self.output, cache=self.cache)
246 citations_in_output = 0
247 encountered_ids = set()
248 unique_entities = 0
250 for file in os.listdir(citations_output_path):
251 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f:
252 cits_rows = list(csv.DictReader(f))
253 citations_in_output += len(cits_rows)
254 for x in cits_rows:
255 citing_ids = x["citing"].split(" ")
256 citied_ids = x["cited"].split(" ")
257 if all(id not in encountered_ids for id in citing_ids):
258 unique_entities += 1
259 encountered_ids.update(citing_ids)
261 if all(id not in encountered_ids for id in citied_ids):
262 unique_entities += 1
263 encountered_ids.update(citied_ids)
265 expected_citations_in_output = 0
267 expected_entities_in_output = 0
269 self.assertEqual(expected_entities_in_output, unique_entities)
270 self.assertEqual(expected_citations_in_output, citations_in_output)
272 shutil.rmtree(citations_output_path)
273 shutil.rmtree(self.output)
275 def test_preprocess_orcid_api_disabled_no_index(self):
276 """
277 With the ORCID API disabled and without a DOI->ORCID index,
278 ORCIDs must not appear in _citing.csv files.
279 """
280 if os.path.exists(self.output):
281 shutil.rmtree(self.output)
282 citations_output_path = self.output + "_citations"
283 if os.path.exists(citations_output_path):
284 shutil.rmtree(citations_output_path)
286 preprocess(
287 crossref_json_dir=self.targz_cited_input,
288 orcid_doi_filepath=None,
289 csv_dir=self.output,
290 cache=self.cache,
291 use_orcid_api=False
292 )
294 found_orcid = False
295 for file in os.listdir(self.output):
296 if file.endswith("_citing.csv"):
297 with open(os.path.join(self.output, file), "r", encoding="utf-8") as f:
298 for row in csv.DictReader(f):
299 if "[orcid:" in (row.get("author", "") or ""):
300 found_orcid = True
301 break
302 if found_orcid:
303 break
305 self.assertFalse(found_orcid)
307 if os.path.exists(citations_output_path):
308 shutil.rmtree(citations_output_path)
309 if os.path.exists(self.output):
310 shutil.rmtree(self.output)
311 if os.path.exists(self.db):
312 os.remove(self.db)
314 def test_preprocess_orcid_api_disabled_no_leak(self):
315 """With ORCID API disabled, authors should not contain [orcid:] unless the DOI is in the provided index."""
316 if os.path.exists(self.output):
317 shutil.rmtree(self.output)
318 citations_output_path = self.output + "_citations"
319 if os.path.exists(citations_output_path):
320 shutil.rmtree(citations_output_path)
322 preprocess(
323 crossref_json_dir=self.targz_cited_input,
324 orcid_doi_filepath=self.iod,
325 csv_dir=self.output,
326 cache=self.cache,
327 use_orcid_api=False
328 )
330 subject_rows = 0
331 orcid_mentions = 0
332 for fname in os.listdir(self.output):
333 if fname.endswith("_citing.csv"):
334 with open(os.path.join(self.output, fname), encoding="utf-8") as f:
335 rdr = csv.DictReader(f)
336 for row in rdr:
337 subject_rows += 1
338 if "[orcid:" in row.get("author", ""):
339 orcid_mentions += 1
341 self.assertGreater(subject_rows, 0)
342 self.assertEqual(orcid_mentions, 0)
344 shutil.rmtree(citations_output_path)
345 shutil.rmtree(self.output)
346 if os.path.exists(self.db):
347 os.remove(self.db)
349 def test_preprocess_filters_entities_without_doi_references(self):
350 """
351 Only entities with at least one reference containing a DOI should be
352 included in the citing entities output. Entities without the reference
353 field, with an empty reference array, or with references lacking DOIs
354 should be excluded.
355 """
356 reference_filter_input = os.path.join(self.test_dir, 'reference_filter_test')
358 if os.path.exists(self.output):
359 shutil.rmtree(self.output)
360 citations_output_path = self.output + "_citations"
361 if os.path.exists(citations_output_path):
362 shutil.rmtree(citations_output_path)
364 preprocess(
365 crossref_json_dir=reference_filter_input,
366 orcid_doi_filepath=None,
367 csv_dir=self.output,
368 cache=self.cache,
369 use_orcid_api=False
370 )
372 # Count citing entities in output
373 citing_entities = []
374 for fname in os.listdir(self.output):
375 if fname.endswith("_citing.csv"):
376 with open(os.path.join(self.output, fname), encoding="utf-8") as f:
377 for row in csv.DictReader(f):
378 citing_entities.append(row)
380 # Only the entity with DOI "10.1234/reference-with-doi" should be in the output
381 self.assertEqual(len(citing_entities), 1)
382 self.assertIn("doi:10.1234/reference-with-doi", citing_entities[0]["id"])
384 shutil.rmtree(citations_output_path)
385 shutil.rmtree(self.output)
388if __name__ == '__main__':
389 unittest.main()