Coverage for test/merge_csv_light_test.py: 99%

306 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3 

4import csv 

5import multiprocessing 

6import os 

7import re 

8import shutil 

9import tempfile 

10import unittest 

11 

12from oc_meta.run.merge_csv_dumps_light import ( 

13 CSVDumpMergerLight, extract_omid_from_id_field, get_all_csv_files, 

14 merge_sorted_temp_files, normalize_row_data, 

15 process_csv_file_to_temp) 

16 

17 

18class TestMergeCSVLight(unittest.TestCase): 

19 """Test suite for CSV Dump Merger Light functionality with streaming architecture""" 

20 

21 def setUp(self): 

22 """Set up test environment before each test""" 

23 self.base_dir = os.path.dirname(__file__) 

24 self.test_data_dir = os.path.join(self.base_dir, 'merge_csv_light_test') 

25 self.existing_dir = os.path.join(self.test_data_dir, 'existing_csv') 

26 self.new_dir = os.path.join(self.test_data_dir, 'new_csv') 

27 self.output_dir = os.path.join(self.test_data_dir, 'test_output') 

28 

29 if os.path.exists(self.output_dir): 

30 shutil.rmtree(self.output_dir) 

31 

32 # Test with single worker for predictable results by default 

33 self.merger = CSVDumpMergerLight(max_workers=1) 

34 

35 def tearDown(self): 

36 """Clean up after each test""" 

37 if os.path.exists(self.output_dir): 

38 shutil.rmtree(self.output_dir) 

39 

40 def _run_merge_and_load_data(self, rows_per_file=2, max_workers=1): 

41 """Helper method to run merge and load all data for comprehensive testing""" 

42 merger = CSVDumpMergerLight(max_workers=max_workers) 

43 merger.merge_dumps_light(self.existing_dir, self.new_dir, self.output_dir, rows_per_file) 

44 

45 output_files = sorted([f for f in os.listdir(self.output_dir) if f.endswith('.csv')]) 

46 all_rows = [] 

47 for file_name in output_files: 

48 file_path = os.path.join(self.output_dir, file_name) 

49 with open(file_path, 'r', encoding='utf-8') as f: 

50 reader = csv.DictReader(f) 

51 all_rows.extend(list(reader)) 

52 

53 return output_files, all_rows 

54 

55 def _run_minimal_memory_and_load_data(self, rows_per_file=2): 

56 """Helper method to run minimal memory merge and load all data""" 

57 self.merger.merge_dumps_minimal_memory(self.existing_dir, self.new_dir, self.output_dir, rows_per_file) 

58 

59 output_files = sorted([f for f in os.listdir(self.output_dir) if f.endswith('.csv')]) 

60 all_rows = [] 

61 for file_name in output_files: 

62 file_path = os.path.join(self.output_dir, file_name) 

63 with open(file_path, 'r', encoding='utf-8') as f: 

64 reader = csv.DictReader(f) 

65 all_rows.extend(list(reader)) 

66 

67 return output_files, all_rows 

68 

69 def test_extract_omid_from_id_field(self): 

70 """Test OMID extraction from ID field""" 

71 # Test valid OMID 

72 self.assertEqual(extract_omid_from_id_field("omid:br/001 doi:123"), "omid:br/001") 

73 self.assertEqual(extract_omid_from_id_field("doi:123 omid:ra/456"), "omid:ra/456") 

74 

75 # Test invalid cases 

76 self.assertEqual(extract_omid_from_id_field(""), "") 

77 self.assertEqual(extract_omid_from_id_field("doi:123"), "") 

78 self.assertEqual(extract_omid_from_id_field(None), "") 

79 

80 def test_normalize_row_data(self): 

81 """Test row data normalization""" 

82 test_row = { 

83 'id': 'doi:123 omid:br/001', 

84 'author': 'John Doe [orcid:123 omid:ra/001]', 

85 'page': '333-333', 

86 'title': 'Test Article', 

87 'venue': 'Test Journal [omid:br/venue]' 

88 } 

89 

90 normalized = normalize_row_data(test_row) 

91 

92 # ID should have OMID first 

93 self.assertTrue(normalized['id'].startswith('omid:br/001')) 

94 

95 # Page range should be simplified 

96 self.assertEqual(normalized['page'], '333') 

97 

98 # Other fields should be normalized but not empty 

99 self.assertIsNotNone(normalized['author']) 

100 self.assertIsNotNone(normalized['venue']) 

101 

102 def test_get_all_csv_files(self): 

103 """Test CSV file discovery""" 

104 existing_files = get_all_csv_files(self.existing_dir) 

105 new_files = get_all_csv_files(self.new_dir) 

106 

107 self.assertGreater(len(existing_files), 0, "Should find existing CSV files") 

108 self.assertGreater(len(new_files), 0, "Should find new CSV files") 

109 

110 # All files should end with .csv 

111 for file_path in existing_files + new_files: 

112 self.assertTrue(file_path.endswith('.csv')) 

113 

114 # Test non-existent directory 

115 non_existent_files = get_all_csv_files('/non/existent/path') 

116 self.assertEqual(len(non_existent_files), 0) 

117 

118 def test_process_csv_file_to_temp(self): 

119 """Test processing CSV file to temporary sorted file""" 

120 existing_files = get_all_csv_files(self.existing_dir) 

121 self.assertGreater(len(existing_files), 0, "Need files to test") 

122 

123 with tempfile.TemporaryDirectory() as temp_dir: 

124 file_path, temp_file, row_count = process_csv_file_to_temp( 

125 (existing_files[0], temp_dir, False) 

126 ) 

127 

128 self.assertEqual(file_path, existing_files[0]) 

129 self.assertTrue(os.path.exists(temp_file)) 

130 self.assertGreater(row_count, 0) 

131 

132 # Verify temp file content 

133 with open(temp_file, 'r', encoding='utf-8') as f: 

134 reader = csv.DictReader(f) 

135 temp_rows = list(reader) 

136 self.assertEqual(len(temp_rows), row_count) 

137 

138 def test_merge_sorted_temp_files(self): 

139 """Test merging sorted temporary files""" 

140 with tempfile.TemporaryDirectory() as temp_dir: 

141 # Create some test temp files 

142 temp_files = [] 

143 

144 # Create first temp file 

145 temp_file1 = tempfile.NamedTemporaryFile(mode='w', delete=False, 

146 dir=temp_dir, suffix='.csv', encoding='utf-8') 

147 fieldnames = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type', 'publisher', 'editor'] 

148 writer1 = csv.DictWriter(temp_file1, fieldnames=fieldnames) 

149 writer1.writeheader() 

150 writer1.writerow({'id': 'omid:br/001', 'title': 'Article 1', 'author': 'Author 1', 'pub_date': '2021', 'venue': '', 'volume': '', 'issue': '', 'page': '', 'type': '', 'publisher': '', 'editor': ''}) 

151 writer1.writerow({'id': 'omid:br/003', 'title': 'Article 3', 'author': 'Author 3', 'pub_date': '2021', 'venue': '', 'volume': '', 'issue': '', 'page': '', 'type': '', 'publisher': '', 'editor': ''}) 

152 temp_file1.close() 

153 temp_files.append(temp_file1.name) 

154 

155 # Create second temp file 

156 temp_file2 = tempfile.NamedTemporaryFile(mode='w', delete=False, 

157 dir=temp_dir, suffix='.csv', encoding='utf-8') 

158 writer2 = csv.DictWriter(temp_file2, fieldnames=fieldnames) 

159 writer2.writeheader() 

160 writer2.writerow({'id': 'omid:br/002', 'title': 'Article 2', 'author': 'Author 2', 'pub_date': '2021', 'venue': '', 'volume': '', 'issue': '', 'page': '', 'type': '', 'publisher': '', 'editor': ''}) 

161 temp_file2.close() 

162 temp_files.append(temp_file2.name) 

163 

164 # Merge files 

165 merge_output_dir = os.path.join(temp_dir, 'merge_output') 

166 temp_files_with_priority = [(f, 1) for f in temp_files] # Default priority 

167 merge_sorted_temp_files(temp_files_with_priority, merge_output_dir, total_rows=3, rows_per_file=2) 

168 

169 # Verify merged output 

170 self.assertTrue(os.path.exists(merge_output_dir)) 

171 output_files = [f for f in os.listdir(merge_output_dir) if f.endswith('.csv')] 

172 self.assertGreater(len(output_files), 0) 

173 

174 # Check that rows are properly sorted 

175 first_file = os.path.join(merge_output_dir, sorted(output_files)[0]) 

176 with open(first_file, 'r', encoding='utf-8') as f: 

177 reader = csv.DictReader(f) 

178 rows = list(reader) 

179 if len(rows) >= 2: 

180 self.assertLessEqual(rows[0]['id'], rows[1]['id'], "Rows should be sorted by OMID") 

181 

182 def test_initialization_with_workers(self): 

183 """Test CSVDumpMergerLight initialization with different worker counts""" 

184 # Test default initialization 

185 merger_default = CSVDumpMergerLight() 

186 self.assertEqual(merger_default.max_workers, multiprocessing.cpu_count()) 

187 

188 # Test custom worker count 

189 merger_custom = CSVDumpMergerLight(max_workers=2) 

190 self.assertEqual(merger_custom.max_workers, 2) 

191 

192 # Test None (should default to CPU count) 

193 merger_none = CSVDumpMergerLight(max_workers=None) 

194 self.assertEqual(merger_none.max_workers, multiprocessing.cpu_count()) 

195 

196 def test_multiprocessing_functionality(self): 

197 """Test that multiprocessing works correctly with multiple workers""" 

198 # Test with multiple workers 

199 output_files_mp, all_rows_mp = self._run_merge_and_load_data(rows_per_file=2, max_workers=2) 

200 

201 # Test with single worker for comparison 

202 if os.path.exists(self.output_dir): 

203 shutil.rmtree(self.output_dir) 

204 output_files_sp, all_rows_sp = self._run_merge_and_load_data(rows_per_file=2, max_workers=1) 

205 

206 # Results should be identical regardless of worker count 

207 self.assertEqual(len(output_files_mp), len(output_files_sp)) 

208 self.assertEqual(len(all_rows_mp), len(all_rows_sp)) 

209 

210 # Compare sorted data (order might differ due to parallel processing) 

211 mp_ids = sorted([row['id'] for row in all_rows_mp]) 

212 sp_ids = sorted([row['id'] for row in all_rows_sp]) 

213 self.assertEqual(mp_ids, sp_ids) 

214 

215 def test_basic_functionality(self): 

216 """Test basic CSV merge functionality""" 

217 rows_per_file = 2 

218 

219 self.merger.merge_dumps_light(self.existing_dir, self.new_dir, self.output_dir, rows_per_file) 

220 

221 self.assertTrue(os.path.exists(self.output_dir)) 

222 

223 output_files = [f for f in os.listdir(self.output_dir) if f.endswith('.csv')] 

224 self.assertGreater(len(output_files), 0, "No output files were created") 

225 

226 expected_files = ['oc_meta_data_001.csv', 'oc_meta_data_002.csv', 'oc_meta_data_003.csv'] 

227 for expected_file in expected_files: 

228 self.assertIn(expected_file, output_files, f"Expected file {expected_file} not found") 

229 

230 for filename in sorted(output_files): 

231 file_path = os.path.join(self.output_dir, filename) 

232 with open(file_path, 'r', encoding='utf-8') as f: 

233 lines = f.readlines() 

234 self.assertGreater(len(lines), 1, f"File {filename} should have header + data rows") 

235 

236 def test_minimal_memory_mode(self): 

237 """Test minimal memory mode functionality""" 

238 rows_per_file = 2 

239 

240 output_files_minimal, all_rows_minimal = self._run_minimal_memory_and_load_data(rows_per_file) 

241 

242 # Reset and test regular mode 

243 if os.path.exists(self.output_dir): 

244 shutil.rmtree(self.output_dir) 

245 output_files_regular, all_rows_regular = self._run_merge_and_load_data(rows_per_file) 

246 

247 # Results should be identical between minimal memory and regular modes 

248 self.assertEqual(len(output_files_minimal), len(output_files_regular)) 

249 self.assertEqual(len(all_rows_minimal), len(all_rows_regular)) 

250 

251 # Compare sorted data 

252 minimal_ids = sorted([row['id'] for row in all_rows_minimal]) 

253 regular_ids = sorted([row['id'] for row in all_rows_regular]) 

254 self.assertEqual(minimal_ids, regular_ids) 

255 

256 def test_omid_ordering_and_normalization(self): 

257 """Test OMID ordering and ID field normalization""" 

258 rows_per_file = 2 

259 self.merger.merge_dumps_light(self.existing_dir, self.new_dir, self.output_dir, rows_per_file) 

260 

261 first_file = os.path.join(self.output_dir, 'oc_meta_data_001.csv') 

262 self.assertTrue(os.path.exists(first_file), "First output file should exist") 

263 

264 with open(first_file, 'r', encoding='utf-8') as f: 

265 lines = f.readlines() 

266 self.assertGreater(len(lines), 1, "First file should have data rows") 

267 

268 for i, line in enumerate(lines): 

269 if i == 0: # Header 

270 self.assertIn('id', line.lower(), "Header should contain 'id' field") 

271 else: 

272 parts = line.split(',') 

273 self.assertGreater(len(parts), 0, f"Row {i} should have CSV fields") 

274 

275 id_field = parts[0].strip('"') 

276 

277 id_parts = id_field.split() 

278 omid_parts = [part for part in id_parts if part.startswith('omid:')] 

279 self.assertGreater(len(omid_parts), 0, f"Row {i} should contain OMID") 

280 self.assertTrue(id_parts[0].startswith('omid:'), f"Row {i}: OMID should be first in ID field") 

281 

282 def test_file_structure(self): 

283 """Test that required test data files exist""" 

284 self.assertTrue(os.path.exists(self.existing_dir), f"Existing CSV directory should exist: {self.existing_dir}") 

285 self.assertTrue(os.path.exists(self.new_dir), f"New CSV directory should exist: {self.new_dir}") 

286 

287 existing_files = [f for f in os.listdir(self.existing_dir) if f.endswith('.csv')] 

288 new_files = [f for f in os.listdir(self.new_dir) if f.endswith('.csv')] 

289 

290 self.assertGreater(len(existing_files), 0, "Should have existing CSV files for testing") 

291 self.assertGreater(len(new_files), 0, "Should have new CSV files for testing") 

292 

293 def test_merge_with_default_rows_per_file(self): 

294 """Test merge with default rows per file (3000)""" 

295 self.merger.merge_dumps_light(self.existing_dir, self.new_dir, self.output_dir) 

296 

297 output_files = [f for f in os.listdir(self.output_dir) if f.endswith('.csv')] 

298 self.assertEqual(len(output_files), 1, "With default rows per file, should create only one output file") 

299 

300 expected_file = 'oc_meta_data_001.csv' 

301 self.assertIn(expected_file, output_files, f"Should create {expected_file}") 

302 

303 def test_omid_precedence(self): 

304 """Test OMID precedence (new files override existing)""" 

305 output_files, all_rows = self._run_merge_and_load_data() 

306 

307 omid_002_row = next((row for row in all_rows if 'omid:br/002' in row['id']), None) 

308 self.assertIsNotNone(omid_002_row, "Should find OMID br/002 in output") 

309 

310 self.assertIn("Updated", omid_002_row['title'], "New file should override existing file") 

311 

312 def test_omid_alphabetical_ordering(self): 

313 """Test OMID alphabetical ordering""" 

314 output_files, all_rows = self._run_merge_and_load_data() 

315 

316 omids_found = [] 

317 for row in all_rows: 

318 omid = None 

319 for id_part in row['id'].split(): 

320 if id_part.startswith('omid:'): 

321 omid = id_part 

322 break 

323 if omid: 

324 omids_found.append(omid) 

325 

326 sorted_omids = sorted(omids_found) 

327 self.assertEqual(omids_found, sorted_omids, "OMIDs should be sorted alphabetically") 

328 

329 def test_id_field_normalization(self): 

330 """Test ID field normalization (OMID first, others sorted)""" 

331 output_files, all_rows = self._run_merge_and_load_data() 

332 

333 for i, row in enumerate(all_rows[:3]): # Check first 3 rows 

334 id_field = row['id'] 

335 id_parts = id_field.split() 

336 omid_parts = [part for part in id_parts if part.startswith('omid:')] 

337 other_parts = [part for part in id_parts if not part.startswith('omid:')] 

338 

339 self.assertGreater(len(omid_parts), 0, f"Row {i+1} should have OMID") 

340 self.assertIn(id_parts[0], omid_parts, f"Row {i+1}: OMID should be first") 

341 self.assertEqual(sorted(other_parts), other_parts, f"Row {i+1}: Other IDs should be sorted") 

342 

343 def test_page_field_normalization(self): 

344 """Test page field normalization (333-333 -> 333)""" 

345 output_files, all_rows = self._run_merge_and_load_data() 

346 

347 page_333_row = next((row for row in all_rows if row['page'] == '333'), None) 

348 self.assertIsNotNone(page_333_row, "Should find normalized page field '333'") 

349 

350 def test_people_field_id_normalization(self): 

351 """Test people field ID normalization (OMID first in brackets)""" 

352 output_files, all_rows = self._run_merge_and_load_data() 

353 

354 for i, row in enumerate(all_rows[:2]): # Check first 2 rows 

355 author_field = row['author'] 

356 # Check if OMIDs come before other IDs in brackets 

357 if '[omid:ra/' in author_field and 'orcid:' in author_field: 

358 brackets = re.findall(r'\[([^\]]+)\]', author_field) 

359 for bracket_content in brackets: 

360 ids_in_bracket = bracket_content.split() 

361 omid_ids = [id for id in ids_in_bracket if id.startswith('omid:')] 

362 

363 if omid_ids: 

364 self.assertIn(ids_in_bracket[0], omid_ids, f"OMID should be first in brackets: {bracket_content}") 

365 

366 def test_progressive_file_naming(self): 

367 """Test progressive file naming""" 

368 output_files, all_rows = self._run_merge_and_load_data() 

369 

370 expected_names = ['oc_meta_data_001.csv', 'oc_meta_data_002.csv', 'oc_meta_data_003.csv'] 

371 for expected_name in expected_names: 

372 self.assertIn(expected_name, output_files, f"Expected file {expected_name} should exist") 

373 

374 def test_file_row_counts(self): 

375 """Test file row counts and total rows""" 

376 output_files, all_rows = self._run_merge_and_load_data() 

377 

378 total_rows = 0 

379 for file_name in output_files: 

380 file_path = os.path.join(self.output_dir, file_name) 

381 with open(file_path, 'r', encoding='utf-8') as f: 

382 reader = csv.DictReader(f) 

383 row_count = len(list(reader)) 

384 total_rows += row_count 

385 

386 # Verify we have the expected total number of rows (6 from test data) 

387 self.assertEqual(total_rows, 6, "Should have 6 total rows from test data") 

388 self.assertEqual(len(all_rows), 6, "Loaded rows should match total count") 

389 

390 def test_data_consistency(self): 

391 """Test data consistency and completeness""" 

392 output_files, all_rows = self._run_merge_and_load_data() 

393 

394 # Verify all rows have required fields 

395 required_fields = ['id', 'title', 'author', 'pub_date', 'venue'] 

396 for i, row in enumerate(all_rows): 

397 for field in required_fields: 

398 self.assertIn(field, row, f"Row {i+1} should have field '{field}'") 

399 

400 # Verify no duplicate OMIDs 

401 omids = [] 

402 for row in all_rows: 

403 for id_part in row['id'].split(): 

404 if id_part.startswith('omid:'): 

405 omids.append(id_part) 

406 break 

407 

408 unique_omids = set(omids) 

409 self.assertEqual(len(omids), len(unique_omids), "Should have no duplicate OMIDs") 

410 

411 def test_error_handling_with_multiprocessing(self): 

412 """Test error handling with multiprocessing (non-existent directories)""" 

413 non_existent_dir = os.path.join(self.test_data_dir, 'non_existent') 

414 

415 # Should handle gracefully without crashing 

416 merger = CSVDumpMergerLight(max_workers=2) 

417 merger.merge_dumps_light(non_existent_dir, self.new_dir, self.output_dir) 

418 

419 # Should still process new_dir files 

420 self.assertTrue(os.path.exists(self.output_dir)) 

421 output_files = [f for f in os.listdir(self.output_dir) if f.endswith('.csv')] 

422 self.assertGreater(len(output_files), 0, "Should create files from new_dir despite missing existing_dir") 

423 

424 def test_streaming_vs_minimal_memory_consistency(self): 

425 """Test that streaming and minimal memory modes produce identical results""" 

426 # Run streaming mode 

427 output_files_streaming, all_rows_streaming = self._run_merge_and_load_data(rows_per_file=2) 

428 

429 # Reset and run minimal memory mode 

430 if os.path.exists(self.output_dir): 

431 shutil.rmtree(self.output_dir) 

432 output_files_minimal, all_rows_minimal = self._run_minimal_memory_and_load_data(rows_per_file=2) 

433 

434 # Compare results 

435 self.assertEqual(len(all_rows_streaming), len(all_rows_minimal), "Both modes should produce same number of rows") 

436 

437 # Compare content (sort to handle potential ordering differences) 

438 streaming_content = sorted([row['id'] + '|' + row['title'] for row in all_rows_streaming]) 

439 minimal_content = sorted([row['id'] + '|' + row['title'] for row in all_rows_minimal]) 

440 self.assertEqual(streaming_content, minimal_content, "Both modes should produce identical content") 

441 

442 def test_deduplication_functionality(self): 

443 """Test that duplicate OMIDs are properly handled""" 

444 output_files, all_rows = self._run_merge_and_load_data() 

445 

446 # Extract all OMIDs 

447 omids_found = [] 

448 for row in all_rows: 

449 for id_part in row['id'].split(): 

450 if id_part.startswith('omid:'): 

451 omids_found.append(id_part) 

452 break 

453 

454 # Check for duplicates 

455 unique_omids = set(omids_found) 

456 self.assertEqual(len(omids_found), len(unique_omids), 

457 f"Found duplicate OMIDs: {[omid for omid in omids_found if omids_found.count(omid) > 1]}") 

458 

459 def test_priority_handling_with_temp_files(self): 

460 """Test priority handling in merge_sorted_temp_files_with_priority""" 

461 with tempfile.TemporaryDirectory() as temp_dir: 

462 # Create test temp files with different priorities 

463 temp_files_with_priority = [] 

464 

465 # Create higher priority file (0 = new file) 

466 temp_file1 = tempfile.NamedTemporaryFile(mode='w', delete=False, 

467 dir=temp_dir, suffix='.csv', encoding='utf-8') 

468 fieldnames = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type', 'publisher', 'editor'] 

469 writer1 = csv.DictWriter(temp_file1, fieldnames=fieldnames) 

470 writer1.writeheader() 

471 writer1.writerow({'id': 'omid:br/002', 'title': 'Updated Article', 'author': 'New Author', 'pub_date': '2023', 'venue': '', 'volume': '', 'issue': '', 'page': '', 'type': '', 'publisher': '', 'editor': ''}) 

472 temp_file1.close() 

473 temp_files_with_priority.append((temp_file1.name, 0)) # Higher priority (new file) 

474 

475 # Create lower priority file (1 = existing file) 

476 temp_file2 = tempfile.NamedTemporaryFile(mode='w', delete=False, 

477 dir=temp_dir, suffix='.csv', encoding='utf-8') 

478 writer2 = csv.DictWriter(temp_file2, fieldnames=fieldnames) 

479 writer2.writeheader() 

480 writer2.writerow({'id': 'omid:br/002', 'title': 'Original Article', 'author': 'Old Author', 'pub_date': '2022', 'venue': '', 'volume': '', 'issue': '', 'page': '', 'type': '', 'publisher': '', 'editor': ''}) 

481 temp_file2.close() 

482 temp_files_with_priority.append((temp_file2.name, 1)) # Lower priority (existing file) 

483 

484 # Merge files 

485 merge_output_dir = os.path.join(temp_dir, 'merge_output') 

486 merge_sorted_temp_files(temp_files_with_priority, merge_output_dir, total_rows=2, rows_per_file=10) 

487 

488 # Verify that higher priority (new file) won 

489 output_files = [f for f in os.listdir(merge_output_dir) if f.endswith('.csv')] 

490 self.assertGreater(len(output_files), 0) 

491 

492 first_file = os.path.join(merge_output_dir, sorted(output_files)[0]) 

493 with open(first_file, 'r', encoding='utf-8') as f: 

494 reader = csv.DictReader(f) 

495 rows = list(reader) 

496 

497 # Should only have one row (the higher priority one) 

498 self.assertEqual(len(rows), 1) 

499 # Should be the updated article (from higher priority file) 

500 self.assertIn('Updated', rows[0]['title']) 

501 self.assertIn('New Author', rows[0]['author']) 

502 

503 

504if __name__ == '__main__': 

505 unittest.main(verbosity=2)