Coverage for test/preprocess_input_test.py: 95%

262 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2024 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17import csv 

18import os 

19import shutil 

20import tempfile 

21import unittest 

22from unittest.mock import MagicMock, patch 

23 

24import redis 

25from oc_meta.run.meta.preprocess_input import process_csv_file 

26 

27 

28class MockSPARQLResponse: 

29 def __init__(self, boolean_value): 

30 self.boolean_value = boolean_value 

31 

32 def convert(self): 

33 return {'boolean': self.boolean_value} 

34 

35class TestPreprocessInput(unittest.TestCase): 

36 def setUp(self): 

37 self.test_dir = tempfile.mkdtemp(dir='.') 

38 self.output_dir = tempfile.mkdtemp(dir='.') 

39 

40 # Create Redis connection for testing (using DB 5) 

41 self.redis_client = redis.Redis(host='localhost', port=6379, db=5, decode_responses=True) 

42 

43 # Add some test data to Redis 

44 self.redis_client.set('doi:10.1007/978-3-662-07918-8_3', '1') 

45 self.redis_client.set('doi:10.1016/0021-9991(73)90147-2', '1') 

46 self.redis_client.set('doi:10.1109/20.877674', '1') 

47 

48 self.sparql_endpoint = "http://example.org/sparql" 

49 

50 self.existing_dois_in_sparql = [ 

51 'doi:10.1007/978-3-662-07918-8_3', 

52 'doi:10.1016/0021-9991(73)90147-2', 

53 'doi:10.1109/20.877674' 

54 ] 

55 

56 def tearDown(self): 

57 shutil.rmtree(self.test_dir) 

58 shutil.rmtree(self.output_dir) 

59 self.redis_client.flushdb() 

60 self.redis_client.close() 

61 

62 def mock_sparql_query(self, endpoint, query, id_str): 

63 """Mock for SPARQL query execution - check if ID exists in our test list""" 

64 if id_str in self.existing_dois_in_sparql: 

65 return MockSPARQLResponse(True) 

66 return MockSPARQLResponse(False) 

67 

68 def test_process_real_metadata_redis(self): 

69 """Test processing metadata with Redis lookup""" 

70 real_data_path = os.path.join(self.test_dir, 'real_metadata.csv') 

71 

72 # These DOIs exist in our Redis test DB 

73 real_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

74"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties, State, and Electrodes on Electric Strength","Ushakov, Vasily Y.","2004","Insulation of High-Voltage Equipment [isbn:9783642058530 isbn:9783662079188]",,,"27-82","book chapter","Springer Science and Business Media LLC [crossref:297]", 

75"doi:10.1016/0021-9991(73)90147-2","Flux-corrected transport. I. SHASTA, a fluid transport algorithm that works","Boris, Jay P; Book, David L","1973-01","Journal of Computational Physics [issn:0021-9991]","11","1","38-69","journal article","Elsevier BV [crossref:78]", 

76"doi:10.1109/20.877674","An investigation of FEM-FCT method for streamer corona simulation","Woong-Gee Min, ; Hyeong-Seok Kim, ; Seok-Hyun Lee, ; Song-Yop Hahn, ","2000-07","IEEE Transactions on Magnetics [issn:0018-9464]","36","4","1280-1284","journal article","Institute of Electrical and Electronics Engineers (IEEE) [crossref:263]",''' 

77 

78 with open(real_data_path, 'w', encoding='utf-8') as f: 

79 f.write(real_metadata) 

80 

81 next_file_num, stats, pending_rows = process_csv_file( 

82 real_data_path, 

83 self.output_dir, 

84 0, 

85 storage_type='redis', 

86 storage_reference=self.redis_client 

87 ) 

88 

89 # Since all DOIs exist in Redis, no file should be created 

90 output_files = os.listdir(self.output_dir) 

91 self.assertEqual(len(output_files), 0) 

92 self.assertEqual(next_file_num, 0) 

93 self.assertEqual(stats.processed_rows, 0) 

94 self.assertEqual(stats.existing_ids_rows, 3) # All 3 rows exist in Redis 

95 self.assertEqual(len(pending_rows), 0) # No pending rows 

96 

97 @patch('oc_meta.run.meta.preprocess_input.SPARQLWrapper') 

98 def test_process_real_metadata_sparql(self, mock_sparql_wrapper): 

99 """Test processing metadata with SPARQL lookup""" 

100 mock_instance = MagicMock() 

101 mock_sparql_wrapper.return_value = mock_instance 

102 

103 mock_response = MockSPARQLResponse(True) 

104 mock_instance.query.return_value = mock_response 

105 

106 real_data_path = os.path.join(self.test_dir, 'real_metadata_sparql.csv') 

107 

108 # These DOIs are configured to exist in our mocked SPARQL endpoint 

109 real_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

110"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties, State, and Electrodes on Electric Strength","Ushakov, Vasily Y.","2004","Insulation of High-Voltage Equipment [isbn:9783642058530 isbn:9783662079188]",,,"27-82","book chapter","Springer Science and Business Media LLC [crossref:297]", 

111"doi:10.1016/0021-9991(73)90147-2","Flux-corrected transport. I. SHASTA, a fluid transport algorithm that works","Boris, Jay P; Book, David L","1973-01","Journal of Computational Physics [issn:0021-9991]","11","1","38-69","journal article","Elsevier BV [crossref:78]", 

112"doi:10.1109/20.877674","An investigation of FEM-FCT method for streamer corona simulation","Woong-Gee Min, ; Hyeong-Seok Kim, ; Seok-Hyun Lee, ; Song-Yop Hahn, ","2000-07","IEEE Transactions on Magnetics [issn:0018-9464]","36","4","1280-1284","journal article","Institute of Electrical and Electronics Engineers (IEEE) [crossref:263]",''' 

113 

114 with open(real_data_path, 'w', encoding='utf-8') as f: 

115 f.write(real_metadata) 

116 

117 with patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql') as mock_check: 

118 def side_effect(ids, endpoint): 

119 if not ids: 

120 return False 

121 

122 id_list = ids.split() 

123 

124 # We'll test both the scheme and value for each ID 

125 for id_str in id_list: 

126 parts = id_str.split(":", 1) 

127 scheme = parts[0] 

128 value = parts[1] 

129 

130 # Make sure both scheme and value are extracted correctly 

131 if scheme != "doi" or not value.startswith("10."): 

132 return False 

133 

134 # Check if the full ID is in our list of valid IDs 

135 if id_str not in self.existing_dois_in_sparql: 

136 return False 

137 

138 return True 

139 

140 mock_check.side_effect = side_effect 

141 

142 next_file_num, stats, pending_rows = process_csv_file( 

143 real_data_path, 

144 self.output_dir, 

145 0, 

146 storage_type='sparql', 

147 storage_reference=self.sparql_endpoint 

148 ) 

149 

150 # Since all DOIs are mocked to exist in SPARQL, no file should be created 

151 output_files = os.listdir(self.output_dir) 

152 self.assertEqual(len(output_files), 0) 

153 self.assertEqual(next_file_num, 0) 

154 self.assertEqual(stats.processed_rows, 0) 

155 self.assertEqual(stats.existing_ids_rows, 3) # All 3 rows exist in mocked SPARQL 

156 self.assertEqual(len(pending_rows), 0) # No pending rows 

157 

158 def test_process_mixed_metadata_redis(self): 

159 """Test processing metadata with both existing and non-existing DOIs in Redis""" 

160 mixed_data_path = os.path.join(self.test_dir, 'mixed_metadata.csv') 

161 

162 # Mix of existing DOIs, non-existing DOIs and empty IDs 

163 mixed_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

164"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties","Author 1","2004","Venue 1",,,"27-82","book chapter","Publisher 1", 

165"","Spatial Distribution of Ion Current","Author 2","2012-01","Venue 2","27","1","380-390","journal article","Publisher 2", 

166"doi:10.INVALID/123456789","Invalid DOI","Author 3","1980-01-14","Venue 3","13","1","3-6","journal article","Publisher 3",''' 

167 

168 with open(mixed_data_path, 'w', encoding='utf-8') as f: 

169 f.write(mixed_metadata) 

170 

171 next_file_num, stats, pending_rows = process_csv_file( 

172 mixed_data_path, 

173 self.output_dir, 

174 0, 

175 storage_type='redis', 

176 storage_reference=self.redis_client 

177 ) 

178 

179 # Write pending rows if any 

180 if pending_rows: 

181 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv") 

182 with open(output_file, 'w', encoding='utf-8', newline='') as out_f: 

183 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys()) 

184 writer.writeheader() 

185 writer.writerows(pending_rows) 

186 

187 # Should create one file with rows having empty IDs or non-existing DOIs 

188 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE 

189 output_files = os.listdir(self.output_dir) 

190 self.assertEqual(len(output_files), 1) 

191 

192 # Verify stats 

193 self.assertEqual(stats.total_rows, 3) 

194 self.assertEqual(stats.existing_ids_rows, 1) # One existing DOI 

195 self.assertEqual(stats.processed_rows, 2) # Two rows should be processed 

196 self.assertEqual(len(pending_rows), 2) # Two rows pending 

197 

198 output_file = os.path.join(self.output_dir, '0.csv') 

199 with open(output_file, 'r', encoding='utf-8') as f: 

200 reader = csv.DictReader(f) 

201 rows = list(reader) 

202 self.assertEqual(len(rows), 2) # Should contain empty ID row and invalid DOI row 

203 self.assertTrue(any(row['id'] == '' for row in rows)) 

204 self.assertTrue(any(row['id'] == 'doi:10.INVALID/123456789' for row in rows)) 

205 

206 @patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql') 

207 def test_process_mixed_metadata_sparql(self, mock_check): 

208 """Test processing metadata with both existing and non-existing DOIs in SPARQL""" 

209 # Mock the check_ids_existence_sparql function 

210 def side_effect(ids, endpoint): 

211 if not ids: 

212 return False 

213 

214 id_list = ids.split() 

215 

216 # We'll test both the scheme and value for each ID 

217 for id_str in id_list: 

218 parts = id_str.split(":", 1) 

219 scheme = parts[0] 

220 value = parts[1] 

221 

222 # Make sure both scheme and value are extracted correctly 

223 if scheme != "doi" or not value.startswith("10."): 

224 continue 

225 

226 # Check if the full ID is in our list of valid IDs 

227 if id_str not in self.existing_dois_in_sparql: 

228 return False 

229 

230 return True 

231 

232 mock_check.side_effect = side_effect 

233 

234 mixed_data_path = os.path.join(self.test_dir, 'mixed_metadata_sparql.csv') 

235 

236 # Mix of existing DOIs, non-existing DOIs and empty IDs 

237 mixed_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

238"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties","Author 1","2004","Venue 1",,,"27-82","book chapter","Publisher 1", 

239"","Spatial Distribution of Ion Current","Author 2","2012-01","Venue 2","27","1","380-390","journal article","Publisher 2", 

240"doi:10.INVALID/123456789","Invalid DOI","Author 3","1980-01-14","Venue 3","13","1","3-6","journal article","Publisher 3",''' 

241 

242 with open(mixed_data_path, 'w', encoding='utf-8') as f: 

243 f.write(mixed_metadata) 

244 

245 next_file_num, stats, pending_rows = process_csv_file( 

246 mixed_data_path, 

247 self.output_dir, 

248 0, 

249 storage_type='sparql', 

250 storage_reference=self.sparql_endpoint 

251 ) 

252 

253 # Write pending rows if any 

254 if pending_rows: 

255 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv") 

256 with open(output_file, 'w', encoding='utf-8', newline='') as out_f: 

257 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys()) 

258 writer.writeheader() 

259 writer.writerows(pending_rows) 

260 

261 # Should create one file with rows having empty IDs or non-existing DOIs 

262 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE 

263 output_files = os.listdir(self.output_dir) 

264 self.assertEqual(len(output_files), 1) 

265 

266 # Verify stats 

267 self.assertEqual(stats.total_rows, 3) 

268 self.assertEqual(stats.existing_ids_rows, 1) # One existing DOI 

269 self.assertEqual(stats.processed_rows, 2) # Two rows should be processed 

270 self.assertEqual(len(pending_rows), 2) # Two rows pending 

271 

272 output_file = os.path.join(self.output_dir, '0.csv') 

273 with open(output_file, 'r', encoding='utf-8') as f: 

274 reader = csv.DictReader(f) 

275 rows = list(reader) 

276 self.assertEqual(len(rows), 2) # Should contain empty ID row and invalid DOI row 

277 self.assertTrue(any(row['id'] == '' for row in rows)) 

278 self.assertTrue(any(row['id'] == 'doi:10.INVALID/123456789' for row in rows)) 

279 

280 def test_process_duplicate_rows(self): 

281 """Test that duplicate rows are properly filtered out""" 

282 test_data_path = os.path.join(self.test_dir, 'duplicate_data.csv') 

283 

284 test_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

285"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

286"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

287"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher", 

288"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

289"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",''' 

290 

291 with open(test_data_path, 'w', encoding='utf-8') as f: 

292 f.write(test_data) 

293 

294 next_file_num, stats, pending_rows = process_csv_file( 

295 test_data_path, 

296 self.output_dir, 

297 0, 

298 storage_type='redis', 

299 storage_reference=self.redis_client 

300 ) 

301 

302 # Write pending rows if any 

303 if pending_rows: 

304 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv") 

305 with open(output_file, 'w', encoding='utf-8', newline='') as out_f: 

306 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys()) 

307 writer.writeheader() 

308 writer.writerows(pending_rows) 

309 

310 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE 

311 self.assertEqual(stats.total_rows, 5) 

312 self.assertEqual(stats.duplicate_rows, 3) # Three duplicate rows 

313 self.assertEqual(stats.processed_rows, 2) # Two unique rows processed 

314 self.assertEqual(len(pending_rows), 2) # Two rows pending 

315 

316 output_files = os.listdir(self.output_dir) 

317 self.assertEqual(len(output_files), 1) 

318 

319 output_file = os.path.join(self.output_dir, '0.csv') 

320 with open(output_file, 'r', encoding='utf-8') as f: 

321 reader = csv.DictReader(f) 

322 rows = list(reader) 

323 

324 self.assertEqual(len(rows), 2) 

325 unique_ids = set(row['id'] for row in rows) 

326 self.assertEqual(len(unique_ids), 2) 

327 self.assertIn('doi:10.INVALID/123', unique_ids) 

328 self.assertIn('doi:10.INVALID/456', unique_ids) 

329 

330 def test_cross_file_deduplication_redis(self): 

331 """Test that duplicate rows are filtered across different files using Redis""" 

332 # Create first file with some data 

333 file1_path = os.path.join(self.test_dir, 'data1.csv') 

334 file1_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

335"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

336"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",''' 

337 

338 # Create second file with some duplicates from first file 

339 file2_path = os.path.join(self.test_dir, 'data2.csv') 

340 file2_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

341"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

342"doi:10.INVALID/789","New Title","New Author","2024","Test Venue","1","1","21-30","journal article","Test Publisher",''' 

343 

344 with open(file1_path, 'w', encoding='utf-8') as f: 

345 f.write(file1_data) 

346 with open(file2_path, 'w', encoding='utf-8') as f: 

347 f.write(file2_data) 

348 

349 # Process both files using the same seen_rows set and pending_rows list 

350 seen_rows = set() 

351 pending_rows = [] 

352 next_file_num, stats1, pending_rows = process_csv_file( 

353 file1_path, 

354 self.output_dir, 

355 0, 

356 storage_type='redis', 

357 storage_reference=self.redis_client, 

358 seen_rows=seen_rows, 

359 pending_rows=pending_rows 

360 ) 

361 next_file_num, stats2, pending_rows = process_csv_file( 

362 file2_path, 

363 self.output_dir, 

364 next_file_num, 

365 storage_type='redis', 

366 storage_reference=self.redis_client, 

367 seen_rows=seen_rows, 

368 pending_rows=pending_rows 

369 ) 

370 

371 # Write final pending rows 

372 if pending_rows: 

373 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv") 

374 with open(output_file, 'w', encoding='utf-8', newline='') as out_f: 

375 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys()) 

376 writer.writeheader() 

377 writer.writerows(pending_rows) 

378 

379 # Verify statistics 

380 self.assertEqual(stats1.total_rows, 2) 

381 self.assertEqual(stats1.duplicate_rows, 0) 

382 self.assertEqual(stats1.processed_rows, 2) 

383 

384 self.assertEqual(stats2.total_rows, 2) 

385 self.assertEqual(stats2.duplicate_rows, 1) # One row should be detected as duplicate 

386 self.assertEqual(stats2.processed_rows, 1) # Only one new row should be processed 

387 

388 # Check output files 

389 output_files = sorted(os.listdir(self.output_dir)) 

390 self.assertEqual(len(output_files), 1) # Should create only one file 

391 

392 # Verify final output contains only unique rows 

393 output_file = os.path.join(self.output_dir, '0.csv') 

394 with open(output_file, 'r', encoding='utf-8') as f: 

395 reader = csv.DictReader(f) 

396 rows = list(reader) 

397 self.assertEqual(len(rows), 3) # Should have 3 unique rows total 

398 unique_ids = set(row['id'] for row in rows) 

399 self.assertEqual(len(unique_ids), 3) 

400 self.assertIn('doi:10.INVALID/123', unique_ids) 

401 self.assertIn('doi:10.INVALID/456', unique_ids) 

402 self.assertIn('doi:10.INVALID/789', unique_ids) 

403 

404 @patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql') 

405 def test_cross_file_deduplication_sparql(self, mock_check): 

406 """Test that duplicate rows are filtered across different files using SPARQL""" 

407 # Mock the check_ids_existence_sparql function 

408 def side_effect(ids, endpoint): 

409 if not ids: 

410 return False 

411 

412 id_list = ids.split() 

413 

414 # We'll test both the scheme and value for each ID 

415 for id_str in id_list: 

416 parts = id_str.split(":", 1) 

417 scheme = parts[0] 

418 value = parts[1] 

419 

420 # Make sure both scheme and value are extracted correctly 

421 if scheme != "doi" or not value.startswith("10."): 

422 continue 

423 

424 # Check if the full ID is in our list of valid IDs 

425 if id_str not in self.existing_dois_in_sparql: 

426 return False 

427 

428 return True 

429 

430 mock_check.side_effect = side_effect 

431 

432 # Create first file with some data 

433 file1_path = os.path.join(self.test_dir, 'data1_sparql.csv') 

434 file1_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

435"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

436"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",''' 

437 

438 # Create second file with some duplicates from first file 

439 file2_path = os.path.join(self.test_dir, 'data2_sparql.csv') 

440 file2_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor 

441"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher", 

442"doi:10.INVALID/789","New Title","New Author","2024","Test Venue","1","1","21-30","journal article","Test Publisher",''' 

443 

444 with open(file1_path, 'w', encoding='utf-8') as f: 

445 f.write(file1_data) 

446 with open(file2_path, 'w', encoding='utf-8') as f: 

447 f.write(file2_data) 

448 

449 # Process both files using the same seen_rows set and pending_rows list 

450 seen_rows = set() 

451 pending_rows = [] 

452 next_file_num, stats1, pending_rows = process_csv_file( 

453 file1_path, 

454 self.output_dir, 

455 0, 

456 storage_type='sparql', 

457 storage_reference=self.sparql_endpoint, 

458 seen_rows=seen_rows, 

459 pending_rows=pending_rows 

460 ) 

461 next_file_num, stats2, pending_rows = process_csv_file( 

462 file2_path, 

463 self.output_dir, 

464 next_file_num, 

465 storage_type='sparql', 

466 storage_reference=self.sparql_endpoint, 

467 seen_rows=seen_rows, 

468 pending_rows=pending_rows 

469 ) 

470 

471 # Write final pending rows 

472 if pending_rows: 

473 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv") 

474 with open(output_file, 'w', encoding='utf-8', newline='') as out_f: 

475 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys()) 

476 writer.writeheader() 

477 writer.writerows(pending_rows) 

478 

479 # Verify statistics 

480 self.assertEqual(stats1.total_rows, 2) 

481 self.assertEqual(stats1.duplicate_rows, 0) 

482 self.assertEqual(stats1.processed_rows, 2) 

483 

484 self.assertEqual(stats2.total_rows, 2) 

485 self.assertEqual(stats2.duplicate_rows, 1) # One row should be detected as duplicate 

486 self.assertEqual(stats2.processed_rows, 1) # Only one new row should be processed 

487 

488 # Check output files 

489 output_files = sorted(os.listdir(self.output_dir)) 

490 self.assertEqual(len(output_files), 1) # Should create only one file 

491 

492 # Verify final output contains only unique rows 

493 output_file = os.path.join(self.output_dir, '0.csv') 

494 with open(output_file, 'r', encoding='utf-8') as f: 

495 reader = csv.DictReader(f) 

496 rows = list(reader) 

497 self.assertEqual(len(rows), 3) # Should have 3 unique rows total 

498 unique_ids = set(row['id'] for row in rows) 

499 self.assertEqual(len(unique_ids), 3) 

500 self.assertIn('doi:10.INVALID/123', unique_ids) 

501 self.assertIn('doi:10.INVALID/456', unique_ids) 

502 self.assertIn('doi:10.INVALID/789', unique_ids) 

503 

504if __name__ == '__main__': 

505 unittest.main()