Coverage for test / crossref_process_test.py: 82%

269 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Marta Soricetti <marta.soricetti@unibo.it> 

2# SPDX-FileCopyrightText: 2024 Arianna Moretti <arianna.moretti4@unibo.it> 

3# SPDX-FileCopyrightText: 2024-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# SPDX-FileCopyrightText: 2025 Arianna Moretti <arianna.moretti4@unibo.it> 

5# 

6# SPDX-License-Identifier: ISC 

7 

8import csv 

9import json 

10import os.path 

11import shutil 

12import tarfile 

13import unittest 

14from os.path import basename, join 

15from pathlib import Path 

16 

17from oc_ds_converter.run.crossref_process import preprocess 

18 

19 

20class CrossrefProcessTest(unittest.TestCase): 

21 def setUp(self) -> None: 

22 self.test_dir = os.path.join('test', 'crossref_processing') 

23 self.targz_input_folder = os.path.join(self.test_dir, 'tar_gz_test') 

24 self.targz_input = os.path.join(self.targz_input_folder, '40228.tar.gz') 

25 self.output = os.path.join(self.test_dir, 'output_dir') 

26 self.wanted_dois = os.path.join(self.test_dir, 'wanted_dois') 

27 self.iod = os.path.join(self.test_dir, 'iod') 

28 self.cache = os.path.join(self.test_dir, 'cache.json') 

29 self.db = os.path.join(self.test_dir, 'anydb.db') 

30 self.targz_cited_folder = os.path.join(self.test_dir, 'tar_gz_cited_test') 

31 self.targz_cited_input = os.path.join(self.targz_cited_folder, '3.json.tar.gz') 

32 self.gzip_input = os.path.join(self.test_dir, 'gzip_test') 

33 self.sample_fake_dump_dir = os.path.join(self.test_dir, 'tar_gz_wrong_cited_doi') 

34 self.sample_fake_dump = os.path.join(self.sample_fake_dump_dir, '1.tar.gz') 

35 self.any_db1 = join(self.test_dir, "anydb1.db") 

36 

37 def test_preprocess_base_decompress_and_read_without_cited(self): 

38 """CASE 1: compressed input without cited entities""" 

39 if os.path.exists(self.output): 

40 shutil.rmtree(self.output) 

41 

42 citations_output_path = self.output + "_citations" 

43 if os.path.exists(citations_output_path): 

44 shutil.rmtree(citations_output_path) 

45 

46 preprocess(self.targz_input, orcid_doi_filepath=self.iod, csv_dir=self.output, cache=self.cache) 

47 

48 citations_in_output = 0 

49 encountered_ids = set() 

50 unique_entities = 0 

51 

52 for file in os.listdir(citations_output_path): 

53 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

54 cits_rows = list(csv.DictReader(f)) 

55 citations_in_output += len(cits_rows) 

56 for x in cits_rows: 

57 citing_ids = x["citing"].split(" ") 

58 citied_ids = x["cited"].split(" ") 

59 if all(id not in encountered_ids for id in citing_ids): 

60 unique_entities += 1 

61 encountered_ids.update(citing_ids) 

62 if all(id not in encountered_ids for id in citied_ids): 

63 unique_entities += 1 

64 encountered_ids.update(citied_ids) 

65 expected_entities_in_output = 0 

66 expected_citations_in_output = 0 

67 self.assertEqual(expected_entities_in_output, unique_entities) 

68 self.assertEqual(expected_citations_in_output, citations_in_output) 

69 

70 shutil.rmtree(self.output) 

71 shutil.rmtree(citations_output_path) 

72 

73 def test_preprocess_base_and_decompress_with_cited(self): 

74 """CASE2: compressed input with cited entities""" 

75 if os.path.exists(self.output): 

76 shutil.rmtree(self.output) 

77 

78 citations_output_path = self.output + "_citations" 

79 if os.path.exists(citations_output_path): 

80 shutil.rmtree(citations_output_path) 

81 

82 preprocess(crossref_json_dir=self.targz_cited_input, orcid_doi_filepath=self.iod, csv_dir=self.output, cache=self.cache) 

83 citations_in_output = 0 

84 encountered_ids = set() 

85 unique_entities = 0 

86 

87 for file in os.listdir(citations_output_path): 

88 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

89 cits_rows = list(csv.DictReader(f)) 

90 citations_in_output += len(cits_rows) 

91 for x in cits_rows: 

92 citing_ids = x["citing"].split(" ") 

93 citied_ids = x["cited"].split(" ") 

94 if all(id not in encountered_ids for id in citing_ids): 

95 unique_entities += 1 

96 encountered_ids.update(citing_ids) 

97 if all(id not in encountered_ids for id in citied_ids): 

98 unique_entities += 1 

99 encountered_ids.update(citied_ids) 

100 expected_entities_in_output = 17 

101 expected_citations_in_output = 16 

102 self.assertEqual(expected_entities_in_output, unique_entities) 

103 self.assertEqual(expected_citations_in_output, citations_in_output) 

104 

105 citations_files_n = len(list(os.listdir(citations_output_path))) 

106 

107 shutil.rmtree(citations_output_path) 

108 

109 meta_files_n = len(list(os.listdir(self.output))) 

110 

111 # Make sure that a meta table row was created for each entity 

112 entities_in_meta_output = 0 

113 for file in os.listdir(self.output): 

114 with open(os.path.join(self.output, file), 'r', encoding='utf-8') as f: 

115 entities_in_meta_output += len(list(csv.DictReader(f))) 

116 

117 self.assertEqual(expected_entities_in_output, entities_in_meta_output) 

118 self.assertEqual(unique_entities, entities_in_meta_output) 

119 

120 # make sure that for each of the input files was created a citation file and two meta input file 

121 self.assertTrue(meta_files_n == 2) 

122 self.assertTrue(citations_files_n == 1) 

123 

124 shutil.rmtree(self.output) 

125 if os.path.exists(self.db): 

126 os.remove(self.db) 

127 

128 def test_preprocess_base_and_decompress_with_cited_redis(self): 

129 """CASE2: compressed input with cited entities""" 

130 if os.path.exists(self.output): 

131 shutil.rmtree(self.output) 

132 

133 citations_output_path = self.output + "_citations" 

134 if os.path.exists(citations_output_path): 

135 shutil.rmtree(citations_output_path) 

136 

137 preprocess(crossref_json_dir=self.targz_cited_input, orcid_doi_filepath=self.iod, csv_dir=self.output, cache=self.cache) 

138 citations_in_output = 0 

139 encountered_ids = set() 

140 unique_entities = 0 

141 

142 for file in os.listdir(citations_output_path): 

143 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

144 cits_rows = list(csv.DictReader(f)) 

145 citations_in_output += len(cits_rows) 

146 for x in cits_rows: 

147 citing_ids = x["citing"].split(" ") 

148 citied_ids = x["cited"].split(" ") 

149 if all(id not in encountered_ids for id in citing_ids): 

150 unique_entities += 1 

151 encountered_ids.update(citing_ids) 

152 if all(id not in encountered_ids for id in citied_ids): 

153 unique_entities += 1 

154 encountered_ids.update(citied_ids) 

155 expected_entities_in_output = 17 

156 expected_citations_in_output = 16 

157 self.assertEqual(expected_entities_in_output, unique_entities) 

158 self.assertEqual(expected_citations_in_output, citations_in_output) 

159 

160 citations_files_n = len(list(os.listdir(citations_output_path))) 

161 

162 shutil.rmtree(citations_output_path) 

163 

164 meta_files_n = len(list(os.listdir(self.output))) 

165 

166 # Make sure that a meta table row was created for each entity 

167 entities_in_meta_output = 0 

168 for file in os.listdir(self.output): 

169 with open(os.path.join(self.output, file), 'r', encoding='utf-8') as f: 

170 entities_in_meta_output += len(list(csv.DictReader(f))) 

171 

172 self.assertEqual(expected_entities_in_output, entities_in_meta_output) 

173 self.assertEqual(unique_entities, entities_in_meta_output) 

174 

175 # make sure that for each of the input files was created a citation file and two meta input file 

176 self.assertTrue(meta_files_n == 2) 

177 self.assertTrue(citations_files_n == 1) 

178 

179 shutil.rmtree(self.output) 

180 #os.remove(self.any_db1) 

181 

182 def test_preprocess_wrong_doi_cited(self): 

183 

184 if os.path.exists(self.output): 

185 shutil.rmtree(self.output) 

186 

187 citations_output_path = self.output + "_citations" 

188 if os.path.exists(citations_output_path): 

189 shutil.rmtree(citations_output_path) 

190 

191 preprocess(self.sample_fake_dump, orcid_doi_filepath=self.iod, csv_dir=self.output, cache=self.cache) 

192 

193 citations_in_output = 0 

194 encountered_ids = set() 

195 unique_entities = 0 

196 

197 for file in os.listdir(citations_output_path): 

198 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

199 cits_rows = list(csv.DictReader(f)) 

200 citations_in_output += len(cits_rows) 

201 for x in cits_rows: 

202 citing_ids = x["citing"].split(" ") 

203 citied_ids = x["cited"].split(" ") 

204 if all(id not in encountered_ids for id in citing_ids): 

205 unique_entities += 1 

206 encountered_ids.update(citing_ids) 

207 if all(id not in encountered_ids for id in citied_ids): 

208 unique_entities += 1 

209 encountered_ids.update(citied_ids) 

210 

211 expected_citations_in_output = 15 

212 

213 expected_entities_in_output = 16 

214 

215 self.assertEqual(expected_entities_in_output, unique_entities) 

216 self.assertEqual(expected_citations_in_output, citations_in_output) 

217 

218 shutil.rmtree(self.output) 

219 shutil.rmtree(citations_output_path) 

220 

221 if os.path.exists(self.db): 

222 os.remove(self.db) 

223 

224 def test_cache(self): 

225 'Nothing should be produced in output, since the cache file reports that all the files in input were completed' 

226 

227 if os.path.exists(self.output): 

228 shutil.rmtree(self.output) 

229 

230 citations_output_path = self.output + "_citations" 

231 if os.path.exists(citations_output_path): 

232 shutil.rmtree(citations_output_path) 

233 cache_dict = {'citing': [], 'cited': []} 

234 targz_fd = tarfile.open(self.targz_cited_input, "r:gz", encoding="utf-8") 

235 for cur_file in targz_fd: 

236 if cur_file.name.endswith('.json') and not basename(cur_file.name).startswith("."): 

237 cache_dict['citing'].append(Path(cur_file.name).name) 

238 cache_dict['cited'].append(Path(cur_file.name).name) 

239 

240 with open(self.cache, "w") as write_cache: 

241 json.dump(cache_dict, write_cache) 

242 

243 preprocess(crossref_json_dir=self.targz_cited_input, 

244 orcid_doi_filepath=self.iod, csv_dir=self.output, cache=self.cache) 

245 

246 citations_in_output = 0 

247 encountered_ids = set() 

248 unique_entities = 0 

249 

250 for file in os.listdir(citations_output_path): 

251 with open(os.path.join(citations_output_path, file), 'r', encoding='utf-8') as f: 

252 cits_rows = list(csv.DictReader(f)) 

253 citations_in_output += len(cits_rows) 

254 for x in cits_rows: 

255 citing_ids = x["citing"].split(" ") 

256 citied_ids = x["cited"].split(" ") 

257 if all(id not in encountered_ids for id in citing_ids): 

258 unique_entities += 1 

259 encountered_ids.update(citing_ids) 

260 

261 if all(id not in encountered_ids for id in citied_ids): 

262 unique_entities += 1 

263 encountered_ids.update(citied_ids) 

264 

265 expected_citations_in_output = 0 

266 

267 expected_entities_in_output = 0 

268 

269 self.assertEqual(expected_entities_in_output, unique_entities) 

270 self.assertEqual(expected_citations_in_output, citations_in_output) 

271 

272 shutil.rmtree(citations_output_path) 

273 shutil.rmtree(self.output) 

274 

275 def test_preprocess_orcid_api_disabled_no_index(self): 

276 """ 

277 With the ORCID API disabled and without a DOI->ORCID index, 

278 ORCIDs must not appear in _citing.csv files. 

279 """ 

280 if os.path.exists(self.output): 

281 shutil.rmtree(self.output) 

282 citations_output_path = self.output + "_citations" 

283 if os.path.exists(citations_output_path): 

284 shutil.rmtree(citations_output_path) 

285 

286 preprocess( 

287 crossref_json_dir=self.targz_cited_input, 

288 orcid_doi_filepath=None, 

289 csv_dir=self.output, 

290 cache=self.cache, 

291 use_orcid_api=False 

292 ) 

293 

294 found_orcid = False 

295 for file in os.listdir(self.output): 

296 if file.endswith("_citing.csv"): 

297 with open(os.path.join(self.output, file), "r", encoding="utf-8") as f: 

298 for row in csv.DictReader(f): 

299 if "[orcid:" in (row.get("author", "") or ""): 

300 found_orcid = True 

301 break 

302 if found_orcid: 

303 break 

304 

305 self.assertFalse(found_orcid) 

306 

307 if os.path.exists(citations_output_path): 

308 shutil.rmtree(citations_output_path) 

309 if os.path.exists(self.output): 

310 shutil.rmtree(self.output) 

311 if os.path.exists(self.db): 

312 os.remove(self.db) 

313 

314 def test_preprocess_orcid_api_disabled_no_leak(self): 

315 """With ORCID API disabled, authors should not contain [orcid:] unless the DOI is in the provided index.""" 

316 if os.path.exists(self.output): 

317 shutil.rmtree(self.output) 

318 citations_output_path = self.output + "_citations" 

319 if os.path.exists(citations_output_path): 

320 shutil.rmtree(citations_output_path) 

321 

322 preprocess( 

323 crossref_json_dir=self.targz_cited_input, 

324 orcid_doi_filepath=self.iod, 

325 csv_dir=self.output, 

326 cache=self.cache, 

327 use_orcid_api=False 

328 ) 

329 

330 subject_rows = 0 

331 orcid_mentions = 0 

332 for fname in os.listdir(self.output): 

333 if fname.endswith("_citing.csv"): 

334 with open(os.path.join(self.output, fname), encoding="utf-8") as f: 

335 rdr = csv.DictReader(f) 

336 for row in rdr: 

337 subject_rows += 1 

338 if "[orcid:" in row.get("author", ""): 

339 orcid_mentions += 1 

340 

341 self.assertGreater(subject_rows, 0) 

342 self.assertEqual(orcid_mentions, 0) 

343 

344 shutil.rmtree(citations_output_path) 

345 shutil.rmtree(self.output) 

346 if os.path.exists(self.db): 

347 os.remove(self.db) 

348 

349 def test_preprocess_filters_entities_without_doi_references(self): 

350 """ 

351 Only entities with at least one reference containing a DOI should be 

352 included in the citing entities output. Entities without the reference 

353 field, with an empty reference array, or with references lacking DOIs 

354 should be excluded. 

355 """ 

356 reference_filter_input = os.path.join(self.test_dir, 'reference_filter_test') 

357 

358 if os.path.exists(self.output): 

359 shutil.rmtree(self.output) 

360 citations_output_path = self.output + "_citations" 

361 if os.path.exists(citations_output_path): 

362 shutil.rmtree(citations_output_path) 

363 

364 preprocess( 

365 crossref_json_dir=reference_filter_input, 

366 orcid_doi_filepath=None, 

367 csv_dir=self.output, 

368 cache=self.cache, 

369 use_orcid_api=False 

370 ) 

371 

372 # Count citing entities in output 

373 citing_entities = [] 

374 for fname in os.listdir(self.output): 

375 if fname.endswith("_citing.csv"): 

376 with open(os.path.join(self.output, fname), encoding="utf-8") as f: 

377 for row in csv.DictReader(f): 

378 citing_entities.append(row) 

379 

380 # Only the entity with DOI "10.1234/reference-with-doi" should be in the output 

381 self.assertEqual(len(citing_entities), 1) 

382 self.assertIn("doi:10.1234/reference-with-doi", citing_entities[0]["id"]) 

383 

384 shutil.rmtree(citations_output_path) 

385 shutil.rmtree(self.output) 

386 

387 

388if __name__ == '__main__': 

389 unittest.main()