Coverage for test/preprocess_input_test.py: 95%

257 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-20 08:55 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2024 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17import csv 

18import os 

19import shutil 

20import tempfile 

21import unittest 

22from unittest.mock import MagicMock, patch 

23 

24import redis 

25from oc_meta.run.meta.preprocess_input import process_csv_file 

26 

27 

28class MockSPARQLResponse: 

29 def __init__(self, boolean_value): 

30 self.boolean_value = boolean_value 

31 

32 def convert(self): 

33 return {'boolean': self.boolean_value} 

34 

35class TestPreprocessInput(unittest.TestCase): 

36 def setUp(self): 

37 self.test_dir = tempfile.mkdtemp(dir='.') 

38 self.output_dir = tempfile.mkdtemp(dir='.') 

39 

40 self.redis_client = redis.Redis(host='localhost', port=6381, db=5, decode_responses=True) 

41 

42 # Add some test data to Redis 

43 self.redis_client.set('doi:10.1007/978-3-662-07918-8_3', '1') 

44 self.redis_client.set('doi:10.1016/0021-9991(73)90147-2', '1') 

45 self.redis_client.set('doi:10.1109/20.877674', '1') 

46 

47 self.sparql_endpoint = "http://example.org/sparql" 

48 

49 self.existing_dois_in_sparql = [ 

50 'doi:10.1007/978-3-662-07918-8_3', 

51 'doi:10.1016/0021-9991(73)90147-2', 

52 'doi:10.1109/20.877674' 

53 ] 

54 

55 def tearDown(self): 

56 shutil.rmtree(self.test_dir) 

57 shutil.rmtree(self.output_dir) 

58 self.redis_client.flushdb() 

59 self.redis_client.close() 

60 

61 def mock_sparql_query(self, endpoint, query, id_str): 

62 """Mock for SPARQL query execution - check if ID exists in our test list""" 

63 if id_str in self.existing_dois_in_sparql: 

64 return MockSPARQLResponse(True) 

65 return MockSPARQLResponse(False) 

66 

67 def test_process_real_metadata_redis(self): 

68 """Test processing metadata with Redis lookup""" 

69 real_data_path = os.path.join(self.test_dir, 'real_metadata.csv') 

70 

71 # These DOIs exist in our Redis test DB 

72 real_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

73"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties, State, and Electrodes on Electric Strength","Ushakov, Vasily Y.","2004","Insulation of High-Voltage Equipment [isbn:9783642058530 isbn:9783662079188]",,,"27-82","book chapter","Springer Science and Business Media LLC [crossref:297]", 

74"doi:10.1016/0021-9991(73)90147-2","Flux-corrected transport. I. SHASTA, a fluid transport algorithm that works","Boris, Jay P; Book, David L","1973-01","Journal of Computational Physics [issn:0021-9991]","11","1","38-69","journal article","Elsevier BV [crossref:78]", 

75"doi:10.1109/20.877674","An investigation of FEM-FCT method for streamer corona simulation","Woong-Gee Min, ; Hyeong-Seok Kim, ; Seok-Hyun Lee, ; Song-Yop Hahn, ","2000-07","IEEE Transactions on Magnetics [issn:0018-9464]","36","4","1280-1284","journal article","Institute of Electrical and Electronics Engineers (IEEE) [crossref:263]",''' 

76 

77 with open(real_data_path, 'w', encoding='utf-8') as f: 

78 f.write(real_metadata) 

79 

80 next_file_num, stats, pending_rows = process_csv_file( 

81 real_data_path, 

82 self.output_dir, 

83 0, 

84 storage_type='redis', 

85 storage_reference=self.redis_client 

86 ) 

87 

88 # Since all DOIs exist in Redis, no file should be created 

89 output_files = os.listdir(self.output_dir) 

90 self.assertEqual(len(output_files), 0) 

91 self.assertEqual(next_file_num, 0) 

92 self.assertEqual(stats.processed_rows, 0) 

93 self.assertEqual(stats.existing_ids_rows, 3) # All 3 rows exist in Redis 

94 self.assertEqual(len(pending_rows), 0) # No pending rows 

95 

96 def test_process_real_metadata_sparql(self): 

97 """Test processing metadata with SPARQL lookup""" 

98 real_data_path = os.path.join(self.test_dir, 'real_metadata_sparql.csv') 

99 

100 # These DOIs are configured to exist in our mocked SPARQL endpoint 

101 real_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

102"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties, State, and Electrodes on Electric Strength","Ushakov, Vasily Y.","2004","Insulation of High-Voltage Equipment [isbn:9783642058530 isbn:9783662079188]",,,"27-82","book chapter","Springer Science and Business Media LLC [crossref:297]", 

103"doi:10.1016/0021-9991(73)90147-2","Flux-corrected transport. I. SHASTA, a fluid transport algorithm that works","Boris, Jay P; Book, David L","1973-01","Journal of Computational Physics [issn:0021-9991]","11","1","38-69","journal article","Elsevier BV [crossref:78]", 

104"doi:10.1109/20.877674","An investigation of FEM-FCT method for streamer corona simulation","Woong-Gee Min, ; Hyeong-Seok Kim, ; Seok-Hyun Lee, ; Song-Yop Hahn, ","2000-07","IEEE Transactions on Magnetics [issn:0018-9464]","36","4","1280-1284","journal article","Institute of Electrical and Electronics Engineers (IEEE) [crossref:263]",''' 

105 

106 with open(real_data_path, 'w', encoding='utf-8') as f: 

107 f.write(real_metadata) 

108 

109 with patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql') as mock_check: 

110 def side_effect(ids, endpoint): 

111 if not ids: 

112 return False 

113 

114 id_list = ids.split() 

115 

116 # We'll test both the scheme and value for each ID 

117 for id_str in id_list: 

118 parts = id_str.split(":", 1) 

119 scheme = parts[0] 

120 value = parts[1] 

121 

122 # Make sure both scheme and value are extracted correctly 

123 if scheme != "doi" or not value.startswith("10."): 

124 return False 

125 

126 # Check if the full ID is in our list of valid IDs 

127 if id_str not in self.existing_dois_in_sparql: 

128 return False 

129 

130 return True 

131 

132 mock_check.side_effect = side_effect 

133 

134 next_file_num, stats, pending_rows = process_csv_file( 

135 real_data_path, 

136 self.output_dir, 

137 0, 

138 storage_type='sparql', 

139 storage_reference=self.sparql_endpoint 

140 ) 

141 

142 # Since all DOIs are mocked to exist in SPARQL, no file should be created 

143 output_files = os.listdir(self.output_dir) 

144 self.assertEqual(len(output_files), 0) 

145 self.assertEqual(next_file_num, 0) 

146 self.assertEqual(stats.processed_rows, 0) 

147 self.assertEqual(stats.existing_ids_rows, 3) # All 3 rows exist in mocked SPARQL 

148 self.assertEqual(len(pending_rows), 0) # No pending rows 

149 

150 def test_process_mixed_metadata_redis(self): 

151 """Test processing metadata with both existing and non-existing DOIs in Redis""" 

152 mixed_data_path = os.path.join(self.test_dir, 'mixed_metadata.csv') 

153 

154 # Mix of existing DOIs, non-existing DOIs and empty IDs 

155 mixed_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

156"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties","Author 1","2004","Venue 1",,,"27-82","book chapter","Publisher 1", 

157"","Spatial Distribution of Ion Current","Author 2","2012-01","Venue 2","27","1","380-390","journal article","Publisher 2", 

158"doi:10.INVALID/123456789","Invalid DOI","Author 3","1980-01-14","Venue 3","13","1","3-6","journal article","Publisher 3",''' 

159 

160 with open(mixed_data_path, 'w', encoding='utf-8') as f: 

161 f.write(mixed_metadata) 

162 

163 next_file_num, stats, pending_rows = process_csv_file( 

164 mixed_data_path, 

165 self.output_dir, 

166 0, 

167 storage_type='redis', 

168 storage_reference=self.redis_client 

169 ) 

170 

171 # Write pending rows if any 

172 if pending_rows: 

173 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv") 

174 with open(output_file, 'w', encoding='utf-8', newline='') as out_f: 

175 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys()) 

176 writer.writeheader() 

177 writer.writerows(pending_rows) 

178 

179 # Should create one file with rows having empty IDs or non-existing DOIs 

180 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE 

181 output_files = os.listdir(self.output_dir) 

182 self.assertEqual(len(output_files), 1) 

183 

184 # Verify stats 

185 self.assertEqual(stats.total_rows, 3) 

186 self.assertEqual(stats.existing_ids_rows, 1) # One existing DOI 

187 self.assertEqual(stats.processed_rows, 2) # Two rows should be processed 

188 self.assertEqual(len(pending_rows), 2) # Two rows pending 

189 

190 output_file = os.path.join(self.output_dir, '0.csv') 

191 with open(output_file, 'r', encoding='utf-8') as f: 

192 reader = csv.DictReader(f) 

193 rows = list(reader) 

194 self.assertEqual(len(rows), 2) # Should contain empty ID row and invalid DOI row 

195 self.assertTrue(any(row['id'] == '' for row in rows)) 

196 self.assertTrue(any(row['id'] == 'doi:10.INVALID/123456789' for row in rows)) 

197 

198 @patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql') 

199 def test_process_mixed_metadata_sparql(self, mock_check): 

200 """Test processing metadata with both existing and non-existing DOIs in SPARQL""" 

201 # Mock the check_ids_existence_sparql function 

202 def side_effect(ids, endpoint): 

203 if not ids: 

204 return False 

205 

206 id_list = ids.split() 

207 

208 # We'll test both the scheme and value for each ID 

209 for id_str in id_list: 

210 parts = id_str.split(":", 1) 

211 scheme = parts[0] 

212 value = parts[1] 

213 

214 # Make sure both scheme and value are extracted correctly 

215 if scheme != "doi" or not value.startswith("10."): 

216 continue 

217 

218 # Check if the full ID is in our list of valid IDs 

219 if id_str not in self.existing_dois_in_sparql: 

220 return False 

221 

222 return True 

223 

224 mock_check.side_effect = side_effect 

225 

226 mixed_data_path = os.path.join(self.test_dir, 'mixed_metadata_sparql.csv') 

227 

228 # Mix of existing DOIs, non-existing DOIs and empty IDs 

229 mixed_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

230"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties","Author 1","2004","Venue 1",,,"27-82","book chapter","Publisher 1", 

231"","Spatial Distribution of Ion Current","Author 2","2012-01","Venue 2","27","1","380-390","journal article","Publisher 2", 

232"doi:10.INVALID/123456789","Invalid DOI","Author 3","1980-01-14","Venue 3","13","1","3-6","journal article","Publisher 3",''' 

233 

234 with open(mixed_data_path, 'w', encoding='utf-8') as f: 

235 f.write(mixed_metadata) 

236 

237 next_file_num, stats, pending_rows = process_csv_file( 

238 mixed_data_path, 

239 self.output_dir, 

240 0, 

241 storage_type='sparql', 

242 storage_reference=self.sparql_endpoint 

243 ) 

244 

245 # Write pending rows if any 

246 if pending_rows: 

247 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv") 

248 with open(output_file, 'w', encoding='utf-8', newline='') as out_f: 

249 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys()) 

250 writer.writeheader() 

251 writer.writerows(pending_rows) 

252 

253 # Should create one file with rows having empty IDs or non-existing DOIs 

254 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE 

255 output_files = os.listdir(self.output_dir) 

256 self.assertEqual(len(output_files), 1) 

257 

258 # Verify stats 

259 self.assertEqual(stats.total_rows, 3) 

260 self.assertEqual(stats.existing_ids_rows, 1) # One existing DOI 

261 self.assertEqual(stats.processed_rows, 2) # Two rows should be processed 

262 self.assertEqual(len(pending_rows), 2) # Two rows pending 

263 

264 output_file = os.path.join(self.output_dir, '0.csv') 

265 with open(output_file, 'r', encoding='utf-8') as f: 

266 reader = csv.DictReader(f) 

267 rows = list(reader) 

268 self.assertEqual(len(rows), 2) # Should contain empty ID row and invalid DOI row 

269 self.assertTrue(any(row['id'] == '' for row in rows)) 

270 self.assertTrue(any(row['id'] == 'doi:10.INVALID/123456789' for row in rows)) 

271 

272 def test_process_duplicate_rows(self): 

273 """Test that duplicate rows are properly filtered out""" 

274 test_data_path = os.path.join(self.test_dir, 'duplicate_data.csv') 

275 

276 test_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

277"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

278"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

279"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher", 

280"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

281"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",''' 

282 

283 with open(test_data_path, 'w', encoding='utf-8') as f: 

284 f.write(test_data) 

285 

286 next_file_num, stats, pending_rows = process_csv_file( 

287 test_data_path, 

288 self.output_dir, 

289 0, 

290 storage_type='redis', 

291 storage_reference=self.redis_client 

292 ) 

293 

294 # Write pending rows if any 

295 if pending_rows: 

296 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv") 

297 with open(output_file, 'w', encoding='utf-8', newline='') as out_f: 

298 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys()) 

299 writer.writeheader() 

300 writer.writerows(pending_rows) 

301 

302 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE 

303 self.assertEqual(stats.total_rows, 5) 

304 self.assertEqual(stats.duplicate_rows, 3) # Three duplicate rows 

305 self.assertEqual(stats.processed_rows, 2) # Two unique rows processed 

306 self.assertEqual(len(pending_rows), 2) # Two rows pending 

307 

308 output_files = os.listdir(self.output_dir) 

309 self.assertEqual(len(output_files), 1) 

310 

311 output_file = os.path.join(self.output_dir, '0.csv') 

312 with open(output_file, 'r', encoding='utf-8') as f: 

313 reader = csv.DictReader(f) 

314 rows = list(reader) 

315 

316 self.assertEqual(len(rows), 2) 

317 unique_ids = set(row['id'] for row in rows) 

318 self.assertEqual(len(unique_ids), 2) 

319 self.assertIn('doi:10.INVALID/123', unique_ids) 

320 self.assertIn('doi:10.INVALID/456', unique_ids) 

321 

322 def test_cross_file_deduplication_redis(self): 

323 """Test that duplicate rows are filtered across different files using Redis""" 

324 # Create first file with some data 

325 file1_path = os.path.join(self.test_dir, 'data1.csv') 

326 file1_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

327"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

328"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",''' 

329 

330 # Create second file with some duplicates from first file 

331 file2_path = os.path.join(self.test_dir, 'data2.csv') 

332 file2_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

333"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

334"doi:10.INVALID/789","New Title","New Author","2024","Test Venue","1","1","21-30","journal article","Test Publisher",''' 

335 

336 with open(file1_path, 'w', encoding='utf-8') as f: 

337 f.write(file1_data) 

338 with open(file2_path, 'w', encoding='utf-8') as f: 

339 f.write(file2_data) 

340 

341 # Process both files using the same seen_rows set and pending_rows list 

342 seen_rows = set() 

343 pending_rows = [] 

344 next_file_num, stats1, pending_rows = process_csv_file( 

345 file1_path, 

346 self.output_dir, 

347 0, 

348 storage_type='redis', 

349 storage_reference=self.redis_client, 

350 seen_rows=seen_rows, 

351 pending_rows=pending_rows 

352 ) 

353 next_file_num, stats2, pending_rows = process_csv_file( 

354 file2_path, 

355 self.output_dir, 

356 next_file_num, 

357 storage_type='redis', 

358 storage_reference=self.redis_client, 

359 seen_rows=seen_rows, 

360 pending_rows=pending_rows 

361 ) 

362 

363 # Write final pending rows 

364 if pending_rows: 

365 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv") 

366 with open(output_file, 'w', encoding='utf-8', newline='') as out_f: 

367 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys()) 

368 writer.writeheader() 

369 writer.writerows(pending_rows) 

370 

371 # Verify statistics 

372 self.assertEqual(stats1.total_rows, 2) 

373 self.assertEqual(stats1.duplicate_rows, 0) 

374 self.assertEqual(stats1.processed_rows, 2) 

375 

376 self.assertEqual(stats2.total_rows, 2) 

377 self.assertEqual(stats2.duplicate_rows, 1) # One row should be detected as duplicate 

378 self.assertEqual(stats2.processed_rows, 1) # Only one new row should be processed 

379 

380 # Check output files 

381 output_files = sorted(os.listdir(self.output_dir)) 

382 self.assertEqual(len(output_files), 1) # Should create only one file 

383 

384 # Verify final output contains only unique rows 

385 output_file = os.path.join(self.output_dir, '0.csv') 

386 with open(output_file, 'r', encoding='utf-8') as f: 

387 reader = csv.DictReader(f) 

388 rows = list(reader) 

389 self.assertEqual(len(rows), 3) # Should have 3 unique rows total 

390 unique_ids = set(row['id'] for row in rows) 

391 self.assertEqual(len(unique_ids), 3) 

392 self.assertIn('doi:10.INVALID/123', unique_ids) 

393 self.assertIn('doi:10.INVALID/456', unique_ids) 

394 self.assertIn('doi:10.INVALID/789', unique_ids) 

395 

396 @patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql') 

397 def test_cross_file_deduplication_sparql(self, mock_check): 

398 """Test that duplicate rows are filtered across different files using SPARQL""" 

399 # Mock the check_ids_existence_sparql function 

400 def side_effect(ids, endpoint): 

401 if not ids: 

402 return False 

403 

404 id_list = ids.split() 

405 

406 # We'll test both the scheme and value for each ID 

407 for id_str in id_list: 

408 parts = id_str.split(":", 1) 

409 scheme = parts[0] 

410 value = parts[1] 

411 

412 # Make sure both scheme and value are extracted correctly 

413 if scheme != "doi" or not value.startswith("10."): 

414 continue 

415 

416 # Check if the full ID is in our list of valid IDs 

417 if id_str not in self.existing_dois_in_sparql: 

418 return False 

419 

420 return True 

421 

422 mock_check.side_effect = side_effect 

423 

424 # Create first file with some data 

425 file1_path = os.path.join(self.test_dir, 'data1_sparql.csv') 

426 file1_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

427"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

428"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",''' 

429 

430 # Create second file with some duplicates from first file 

431 file2_path = os.path.join(self.test_dir, 'data2_sparql.csv') 

432 file2_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

433"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

434"doi:10.INVALID/789","New Title","New Author","2024","Test Venue","1","1","21-30","journal article","Test Publisher",''' 

435 

436 with open(file1_path, 'w', encoding='utf-8') as f: 

437 f.write(file1_data) 

438 with open(file2_path, 'w', encoding='utf-8') as f: 

439 f.write(file2_data) 

440 

441 # Process both files using the same seen_rows set and pending_rows list 

442 seen_rows = set() 

443 pending_rows = [] 

444 next_file_num, stats1, pending_rows = process_csv_file( 

445 file1_path, 

446 self.output_dir, 

447 0, 

448 storage_type='sparql', 

449 storage_reference=self.sparql_endpoint, 

450 seen_rows=seen_rows, 

451 pending_rows=pending_rows 

452 ) 

453 next_file_num, stats2, pending_rows = process_csv_file( 

454 file2_path, 

455 self.output_dir, 

456 next_file_num, 

457 storage_type='sparql', 

458 storage_reference=self.sparql_endpoint, 

459 seen_rows=seen_rows, 

460 pending_rows=pending_rows 

461 ) 

462 

463 # Write final pending rows 

464 if pending_rows: 

465 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv") 

466 with open(output_file, 'w', encoding='utf-8', newline='') as out_f: 

467 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys()) 

468 writer.writeheader() 

469 writer.writerows(pending_rows) 

470 

471 # Verify statistics 

472 self.assertEqual(stats1.total_rows, 2) 

473 self.assertEqual(stats1.duplicate_rows, 0) 

474 self.assertEqual(stats1.processed_rows, 2) 

475 

476 self.assertEqual(stats2.total_rows, 2) 

477 self.assertEqual(stats2.duplicate_rows, 1) # One row should be detected as duplicate 

478 self.assertEqual(stats2.processed_rows, 1) # Only one new row should be processed 

479 

480 # Check output files 

481 output_files = sorted(os.listdir(self.output_dir)) 

482 self.assertEqual(len(output_files), 1) # Should create only one file 

483 

484 # Verify final output contains only unique rows 

485 output_file = os.path.join(self.output_dir, '0.csv') 

486 with open(output_file, 'r', encoding='utf-8') as f: 

487 reader = csv.DictReader(f) 

488 rows = list(reader) 

489 self.assertEqual(len(rows), 3) # Should have 3 unique rows total 

490 unique_ids = set(row['id'] for row in rows) 

491 self.assertEqual(len(unique_ids), 3) 

492 self.assertIn('doi:10.INVALID/123', unique_ids) 

493 self.assertIn('doi:10.INVALID/456', unique_ids) 

494 self.assertIn('doi:10.INVALID/789', unique_ids) 

495 

496if __name__ == '__main__': 

497 unittest.main()