Coverage for test/merge_csv_light_test.py: 99%
306 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
4import csv
5import multiprocessing
6import os
7import re
8import shutil
9import tempfile
10import unittest
12from oc_meta.run.merge_csv_dumps_light import (
13 CSVDumpMergerLight, extract_omid_from_id_field, get_all_csv_files,
14 merge_sorted_temp_files, normalize_row_data,
15 process_csv_file_to_temp)
18class TestMergeCSVLight(unittest.TestCase):
19 """Test suite for CSV Dump Merger Light functionality with streaming architecture"""
21 def setUp(self):
22 """Set up test environment before each test"""
23 self.base_dir = os.path.dirname(__file__)
24 self.test_data_dir = os.path.join(self.base_dir, 'merge_csv_light_test')
25 self.existing_dir = os.path.join(self.test_data_dir, 'existing_csv')
26 self.new_dir = os.path.join(self.test_data_dir, 'new_csv')
27 self.output_dir = os.path.join(self.test_data_dir, 'test_output')
29 if os.path.exists(self.output_dir):
30 shutil.rmtree(self.output_dir)
32 # Test with single worker for predictable results by default
33 self.merger = CSVDumpMergerLight(max_workers=1)
35 def tearDown(self):
36 """Clean up after each test"""
37 if os.path.exists(self.output_dir):
38 shutil.rmtree(self.output_dir)
40 def _run_merge_and_load_data(self, rows_per_file=2, max_workers=1):
41 """Helper method to run merge and load all data for comprehensive testing"""
42 merger = CSVDumpMergerLight(max_workers=max_workers)
43 merger.merge_dumps_light(self.existing_dir, self.new_dir, self.output_dir, rows_per_file)
45 output_files = sorted([f for f in os.listdir(self.output_dir) if f.endswith('.csv')])
46 all_rows = []
47 for file_name in output_files:
48 file_path = os.path.join(self.output_dir, file_name)
49 with open(file_path, 'r', encoding='utf-8') as f:
50 reader = csv.DictReader(f)
51 all_rows.extend(list(reader))
53 return output_files, all_rows
55 def _run_minimal_memory_and_load_data(self, rows_per_file=2):
56 """Helper method to run minimal memory merge and load all data"""
57 self.merger.merge_dumps_minimal_memory(self.existing_dir, self.new_dir, self.output_dir, rows_per_file)
59 output_files = sorted([f for f in os.listdir(self.output_dir) if f.endswith('.csv')])
60 all_rows = []
61 for file_name in output_files:
62 file_path = os.path.join(self.output_dir, file_name)
63 with open(file_path, 'r', encoding='utf-8') as f:
64 reader = csv.DictReader(f)
65 all_rows.extend(list(reader))
67 return output_files, all_rows
69 def test_extract_omid_from_id_field(self):
70 """Test OMID extraction from ID field"""
71 # Test valid OMID
72 self.assertEqual(extract_omid_from_id_field("omid:br/001 doi:123"), "omid:br/001")
73 self.assertEqual(extract_omid_from_id_field("doi:123 omid:ra/456"), "omid:ra/456")
75 # Test invalid cases
76 self.assertEqual(extract_omid_from_id_field(""), "")
77 self.assertEqual(extract_omid_from_id_field("doi:123"), "")
78 self.assertEqual(extract_omid_from_id_field(None), "")
80 def test_normalize_row_data(self):
81 """Test row data normalization"""
82 test_row = {
83 'id': 'doi:123 omid:br/001',
84 'author': 'John Doe [orcid:123 omid:ra/001]',
85 'page': '333-333',
86 'title': 'Test Article',
87 'venue': 'Test Journal [omid:br/venue]'
88 }
90 normalized = normalize_row_data(test_row)
92 # ID should have OMID first
93 self.assertTrue(normalized['id'].startswith('omid:br/001'))
95 # Page range should be simplified
96 self.assertEqual(normalized['page'], '333')
98 # Other fields should be normalized but not empty
99 self.assertIsNotNone(normalized['author'])
100 self.assertIsNotNone(normalized['venue'])
102 def test_get_all_csv_files(self):
103 """Test CSV file discovery"""
104 existing_files = get_all_csv_files(self.existing_dir)
105 new_files = get_all_csv_files(self.new_dir)
107 self.assertGreater(len(existing_files), 0, "Should find existing CSV files")
108 self.assertGreater(len(new_files), 0, "Should find new CSV files")
110 # All files should end with .csv
111 for file_path in existing_files + new_files:
112 self.assertTrue(file_path.endswith('.csv'))
114 # Test non-existent directory
115 non_existent_files = get_all_csv_files('/non/existent/path')
116 self.assertEqual(len(non_existent_files), 0)
118 def test_process_csv_file_to_temp(self):
119 """Test processing CSV file to temporary sorted file"""
120 existing_files = get_all_csv_files(self.existing_dir)
121 self.assertGreater(len(existing_files), 0, "Need files to test")
123 with tempfile.TemporaryDirectory() as temp_dir:
124 file_path, temp_file, row_count = process_csv_file_to_temp(
125 (existing_files[0], temp_dir, False)
126 )
128 self.assertEqual(file_path, existing_files[0])
129 self.assertTrue(os.path.exists(temp_file))
130 self.assertGreater(row_count, 0)
132 # Verify temp file content
133 with open(temp_file, 'r', encoding='utf-8') as f:
134 reader = csv.DictReader(f)
135 temp_rows = list(reader)
136 self.assertEqual(len(temp_rows), row_count)
138 def test_merge_sorted_temp_files(self):
139 """Test merging sorted temporary files"""
140 with tempfile.TemporaryDirectory() as temp_dir:
141 # Create some test temp files
142 temp_files = []
144 # Create first temp file
145 temp_file1 = tempfile.NamedTemporaryFile(mode='w', delete=False,
146 dir=temp_dir, suffix='.csv', encoding='utf-8')
147 fieldnames = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type', 'publisher', 'editor']
148 writer1 = csv.DictWriter(temp_file1, fieldnames=fieldnames)
149 writer1.writeheader()
150 writer1.writerow({'id': 'omid:br/001', 'title': 'Article 1', 'author': 'Author 1', 'pub_date': '2021', 'venue': '', 'volume': '', 'issue': '', 'page': '', 'type': '', 'publisher': '', 'editor': ''})
151 writer1.writerow({'id': 'omid:br/003', 'title': 'Article 3', 'author': 'Author 3', 'pub_date': '2021', 'venue': '', 'volume': '', 'issue': '', 'page': '', 'type': '', 'publisher': '', 'editor': ''})
152 temp_file1.close()
153 temp_files.append(temp_file1.name)
155 # Create second temp file
156 temp_file2 = tempfile.NamedTemporaryFile(mode='w', delete=False,
157 dir=temp_dir, suffix='.csv', encoding='utf-8')
158 writer2 = csv.DictWriter(temp_file2, fieldnames=fieldnames)
159 writer2.writeheader()
160 writer2.writerow({'id': 'omid:br/002', 'title': 'Article 2', 'author': 'Author 2', 'pub_date': '2021', 'venue': '', 'volume': '', 'issue': '', 'page': '', 'type': '', 'publisher': '', 'editor': ''})
161 temp_file2.close()
162 temp_files.append(temp_file2.name)
164 # Merge files
165 merge_output_dir = os.path.join(temp_dir, 'merge_output')
166 temp_files_with_priority = [(f, 1) for f in temp_files] # Default priority
167 merge_sorted_temp_files(temp_files_with_priority, merge_output_dir, total_rows=3, rows_per_file=2)
169 # Verify merged output
170 self.assertTrue(os.path.exists(merge_output_dir))
171 output_files = [f for f in os.listdir(merge_output_dir) if f.endswith('.csv')]
172 self.assertGreater(len(output_files), 0)
174 # Check that rows are properly sorted
175 first_file = os.path.join(merge_output_dir, sorted(output_files)[0])
176 with open(first_file, 'r', encoding='utf-8') as f:
177 reader = csv.DictReader(f)
178 rows = list(reader)
179 if len(rows) >= 2:
180 self.assertLessEqual(rows[0]['id'], rows[1]['id'], "Rows should be sorted by OMID")
182 def test_initialization_with_workers(self):
183 """Test CSVDumpMergerLight initialization with different worker counts"""
184 # Test default initialization
185 merger_default = CSVDumpMergerLight()
186 self.assertEqual(merger_default.max_workers, multiprocessing.cpu_count())
188 # Test custom worker count
189 merger_custom = CSVDumpMergerLight(max_workers=2)
190 self.assertEqual(merger_custom.max_workers, 2)
192 # Test None (should default to CPU count)
193 merger_none = CSVDumpMergerLight(max_workers=None)
194 self.assertEqual(merger_none.max_workers, multiprocessing.cpu_count())
196 def test_multiprocessing_functionality(self):
197 """Test that multiprocessing works correctly with multiple workers"""
198 # Test with multiple workers
199 output_files_mp, all_rows_mp = self._run_merge_and_load_data(rows_per_file=2, max_workers=2)
201 # Test with single worker for comparison
202 if os.path.exists(self.output_dir):
203 shutil.rmtree(self.output_dir)
204 output_files_sp, all_rows_sp = self._run_merge_and_load_data(rows_per_file=2, max_workers=1)
206 # Results should be identical regardless of worker count
207 self.assertEqual(len(output_files_mp), len(output_files_sp))
208 self.assertEqual(len(all_rows_mp), len(all_rows_sp))
210 # Compare sorted data (order might differ due to parallel processing)
211 mp_ids = sorted([row['id'] for row in all_rows_mp])
212 sp_ids = sorted([row['id'] for row in all_rows_sp])
213 self.assertEqual(mp_ids, sp_ids)
215 def test_basic_functionality(self):
216 """Test basic CSV merge functionality"""
217 rows_per_file = 2
219 self.merger.merge_dumps_light(self.existing_dir, self.new_dir, self.output_dir, rows_per_file)
221 self.assertTrue(os.path.exists(self.output_dir))
223 output_files = [f for f in os.listdir(self.output_dir) if f.endswith('.csv')]
224 self.assertGreater(len(output_files), 0, "No output files were created")
226 expected_files = ['oc_meta_data_001.csv', 'oc_meta_data_002.csv', 'oc_meta_data_003.csv']
227 for expected_file in expected_files:
228 self.assertIn(expected_file, output_files, f"Expected file {expected_file} not found")
230 for filename in sorted(output_files):
231 file_path = os.path.join(self.output_dir, filename)
232 with open(file_path, 'r', encoding='utf-8') as f:
233 lines = f.readlines()
234 self.assertGreater(len(lines), 1, f"File {filename} should have header + data rows")
236 def test_minimal_memory_mode(self):
237 """Test minimal memory mode functionality"""
238 rows_per_file = 2
240 output_files_minimal, all_rows_minimal = self._run_minimal_memory_and_load_data(rows_per_file)
242 # Reset and test regular mode
243 if os.path.exists(self.output_dir):
244 shutil.rmtree(self.output_dir)
245 output_files_regular, all_rows_regular = self._run_merge_and_load_data(rows_per_file)
247 # Results should be identical between minimal memory and regular modes
248 self.assertEqual(len(output_files_minimal), len(output_files_regular))
249 self.assertEqual(len(all_rows_minimal), len(all_rows_regular))
251 # Compare sorted data
252 minimal_ids = sorted([row['id'] for row in all_rows_minimal])
253 regular_ids = sorted([row['id'] for row in all_rows_regular])
254 self.assertEqual(minimal_ids, regular_ids)
256 def test_omid_ordering_and_normalization(self):
257 """Test OMID ordering and ID field normalization"""
258 rows_per_file = 2
259 self.merger.merge_dumps_light(self.existing_dir, self.new_dir, self.output_dir, rows_per_file)
261 first_file = os.path.join(self.output_dir, 'oc_meta_data_001.csv')
262 self.assertTrue(os.path.exists(first_file), "First output file should exist")
264 with open(first_file, 'r', encoding='utf-8') as f:
265 lines = f.readlines()
266 self.assertGreater(len(lines), 1, "First file should have data rows")
268 for i, line in enumerate(lines):
269 if i == 0: # Header
270 self.assertIn('id', line.lower(), "Header should contain 'id' field")
271 else:
272 parts = line.split(',')
273 self.assertGreater(len(parts), 0, f"Row {i} should have CSV fields")
275 id_field = parts[0].strip('"')
277 id_parts = id_field.split()
278 omid_parts = [part for part in id_parts if part.startswith('omid:')]
279 self.assertGreater(len(omid_parts), 0, f"Row {i} should contain OMID")
280 self.assertTrue(id_parts[0].startswith('omid:'), f"Row {i}: OMID should be first in ID field")
282 def test_file_structure(self):
283 """Test that required test data files exist"""
284 self.assertTrue(os.path.exists(self.existing_dir), f"Existing CSV directory should exist: {self.existing_dir}")
285 self.assertTrue(os.path.exists(self.new_dir), f"New CSV directory should exist: {self.new_dir}")
287 existing_files = [f for f in os.listdir(self.existing_dir) if f.endswith('.csv')]
288 new_files = [f for f in os.listdir(self.new_dir) if f.endswith('.csv')]
290 self.assertGreater(len(existing_files), 0, "Should have existing CSV files for testing")
291 self.assertGreater(len(new_files), 0, "Should have new CSV files for testing")
293 def test_merge_with_default_rows_per_file(self):
294 """Test merge with default rows per file (3000)"""
295 self.merger.merge_dumps_light(self.existing_dir, self.new_dir, self.output_dir)
297 output_files = [f for f in os.listdir(self.output_dir) if f.endswith('.csv')]
298 self.assertEqual(len(output_files), 1, "With default rows per file, should create only one output file")
300 expected_file = 'oc_meta_data_001.csv'
301 self.assertIn(expected_file, output_files, f"Should create {expected_file}")
303 def test_omid_precedence(self):
304 """Test OMID precedence (new files override existing)"""
305 output_files, all_rows = self._run_merge_and_load_data()
307 omid_002_row = next((row for row in all_rows if 'omid:br/002' in row['id']), None)
308 self.assertIsNotNone(omid_002_row, "Should find OMID br/002 in output")
310 self.assertIn("Updated", omid_002_row['title'], "New file should override existing file")
312 def test_omid_alphabetical_ordering(self):
313 """Test OMID alphabetical ordering"""
314 output_files, all_rows = self._run_merge_and_load_data()
316 omids_found = []
317 for row in all_rows:
318 omid = None
319 for id_part in row['id'].split():
320 if id_part.startswith('omid:'):
321 omid = id_part
322 break
323 if omid:
324 omids_found.append(omid)
326 sorted_omids = sorted(omids_found)
327 self.assertEqual(omids_found, sorted_omids, "OMIDs should be sorted alphabetically")
329 def test_id_field_normalization(self):
330 """Test ID field normalization (OMID first, others sorted)"""
331 output_files, all_rows = self._run_merge_and_load_data()
333 for i, row in enumerate(all_rows[:3]): # Check first 3 rows
334 id_field = row['id']
335 id_parts = id_field.split()
336 omid_parts = [part for part in id_parts if part.startswith('omid:')]
337 other_parts = [part for part in id_parts if not part.startswith('omid:')]
339 self.assertGreater(len(omid_parts), 0, f"Row {i+1} should have OMID")
340 self.assertIn(id_parts[0], omid_parts, f"Row {i+1}: OMID should be first")
341 self.assertEqual(sorted(other_parts), other_parts, f"Row {i+1}: Other IDs should be sorted")
343 def test_page_field_normalization(self):
344 """Test page field normalization (333-333 -> 333)"""
345 output_files, all_rows = self._run_merge_and_load_data()
347 page_333_row = next((row for row in all_rows if row['page'] == '333'), None)
348 self.assertIsNotNone(page_333_row, "Should find normalized page field '333'")
350 def test_people_field_id_normalization(self):
351 """Test people field ID normalization (OMID first in brackets)"""
352 output_files, all_rows = self._run_merge_and_load_data()
354 for i, row in enumerate(all_rows[:2]): # Check first 2 rows
355 author_field = row['author']
356 # Check if OMIDs come before other IDs in brackets
357 if '[omid:ra/' in author_field and 'orcid:' in author_field:
358 brackets = re.findall(r'\[([^\]]+)\]', author_field)
359 for bracket_content in brackets:
360 ids_in_bracket = bracket_content.split()
361 omid_ids = [id for id in ids_in_bracket if id.startswith('omid:')]
363 if omid_ids:
364 self.assertIn(ids_in_bracket[0], omid_ids, f"OMID should be first in brackets: {bracket_content}")
366 def test_progressive_file_naming(self):
367 """Test progressive file naming"""
368 output_files, all_rows = self._run_merge_and_load_data()
370 expected_names = ['oc_meta_data_001.csv', 'oc_meta_data_002.csv', 'oc_meta_data_003.csv']
371 for expected_name in expected_names:
372 self.assertIn(expected_name, output_files, f"Expected file {expected_name} should exist")
374 def test_file_row_counts(self):
375 """Test file row counts and total rows"""
376 output_files, all_rows = self._run_merge_and_load_data()
378 total_rows = 0
379 for file_name in output_files:
380 file_path = os.path.join(self.output_dir, file_name)
381 with open(file_path, 'r', encoding='utf-8') as f:
382 reader = csv.DictReader(f)
383 row_count = len(list(reader))
384 total_rows += row_count
386 # Verify we have the expected total number of rows (6 from test data)
387 self.assertEqual(total_rows, 6, "Should have 6 total rows from test data")
388 self.assertEqual(len(all_rows), 6, "Loaded rows should match total count")
390 def test_data_consistency(self):
391 """Test data consistency and completeness"""
392 output_files, all_rows = self._run_merge_and_load_data()
394 # Verify all rows have required fields
395 required_fields = ['id', 'title', 'author', 'pub_date', 'venue']
396 for i, row in enumerate(all_rows):
397 for field in required_fields:
398 self.assertIn(field, row, f"Row {i+1} should have field '{field}'")
400 # Verify no duplicate OMIDs
401 omids = []
402 for row in all_rows:
403 for id_part in row['id'].split():
404 if id_part.startswith('omid:'):
405 omids.append(id_part)
406 break
408 unique_omids = set(omids)
409 self.assertEqual(len(omids), len(unique_omids), "Should have no duplicate OMIDs")
411 def test_error_handling_with_multiprocessing(self):
412 """Test error handling with multiprocessing (non-existent directories)"""
413 non_existent_dir = os.path.join(self.test_data_dir, 'non_existent')
415 # Should handle gracefully without crashing
416 merger = CSVDumpMergerLight(max_workers=2)
417 merger.merge_dumps_light(non_existent_dir, self.new_dir, self.output_dir)
419 # Should still process new_dir files
420 self.assertTrue(os.path.exists(self.output_dir))
421 output_files = [f for f in os.listdir(self.output_dir) if f.endswith('.csv')]
422 self.assertGreater(len(output_files), 0, "Should create files from new_dir despite missing existing_dir")
424 def test_streaming_vs_minimal_memory_consistency(self):
425 """Test that streaming and minimal memory modes produce identical results"""
426 # Run streaming mode
427 output_files_streaming, all_rows_streaming = self._run_merge_and_load_data(rows_per_file=2)
429 # Reset and run minimal memory mode
430 if os.path.exists(self.output_dir):
431 shutil.rmtree(self.output_dir)
432 output_files_minimal, all_rows_minimal = self._run_minimal_memory_and_load_data(rows_per_file=2)
434 # Compare results
435 self.assertEqual(len(all_rows_streaming), len(all_rows_minimal), "Both modes should produce same number of rows")
437 # Compare content (sort to handle potential ordering differences)
438 streaming_content = sorted([row['id'] + '|' + row['title'] for row in all_rows_streaming])
439 minimal_content = sorted([row['id'] + '|' + row['title'] for row in all_rows_minimal])
440 self.assertEqual(streaming_content, minimal_content, "Both modes should produce identical content")
442 def test_deduplication_functionality(self):
443 """Test that duplicate OMIDs are properly handled"""
444 output_files, all_rows = self._run_merge_and_load_data()
446 # Extract all OMIDs
447 omids_found = []
448 for row in all_rows:
449 for id_part in row['id'].split():
450 if id_part.startswith('omid:'):
451 omids_found.append(id_part)
452 break
454 # Check for duplicates
455 unique_omids = set(omids_found)
456 self.assertEqual(len(omids_found), len(unique_omids),
457 f"Found duplicate OMIDs: {[omid for omid in omids_found if omids_found.count(omid) > 1]}")
459 def test_priority_handling_with_temp_files(self):
460 """Test priority handling in merge_sorted_temp_files_with_priority"""
461 with tempfile.TemporaryDirectory() as temp_dir:
462 # Create test temp files with different priorities
463 temp_files_with_priority = []
465 # Create higher priority file (0 = new file)
466 temp_file1 = tempfile.NamedTemporaryFile(mode='w', delete=False,
467 dir=temp_dir, suffix='.csv', encoding='utf-8')
468 fieldnames = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type', 'publisher', 'editor']
469 writer1 = csv.DictWriter(temp_file1, fieldnames=fieldnames)
470 writer1.writeheader()
471 writer1.writerow({'id': 'omid:br/002', 'title': 'Updated Article', 'author': 'New Author', 'pub_date': '2023', 'venue': '', 'volume': '', 'issue': '', 'page': '', 'type': '', 'publisher': '', 'editor': ''})
472 temp_file1.close()
473 temp_files_with_priority.append((temp_file1.name, 0)) # Higher priority (new file)
475 # Create lower priority file (1 = existing file)
476 temp_file2 = tempfile.NamedTemporaryFile(mode='w', delete=False,
477 dir=temp_dir, suffix='.csv', encoding='utf-8')
478 writer2 = csv.DictWriter(temp_file2, fieldnames=fieldnames)
479 writer2.writeheader()
480 writer2.writerow({'id': 'omid:br/002', 'title': 'Original Article', 'author': 'Old Author', 'pub_date': '2022', 'venue': '', 'volume': '', 'issue': '', 'page': '', 'type': '', 'publisher': '', 'editor': ''})
481 temp_file2.close()
482 temp_files_with_priority.append((temp_file2.name, 1)) # Lower priority (existing file)
484 # Merge files
485 merge_output_dir = os.path.join(temp_dir, 'merge_output')
486 merge_sorted_temp_files(temp_files_with_priority, merge_output_dir, total_rows=2, rows_per_file=10)
488 # Verify that higher priority (new file) won
489 output_files = [f for f in os.listdir(merge_output_dir) if f.endswith('.csv')]
490 self.assertGreater(len(output_files), 0)
492 first_file = os.path.join(merge_output_dir, sorted(output_files)[0])
493 with open(first_file, 'r', encoding='utf-8') as f:
494 reader = csv.DictReader(f)
495 rows = list(reader)
497 # Should only have one row (the higher priority one)
498 self.assertEqual(len(rows), 1)
499 # Should be the updated article (from higher priority file)
500 self.assertIn('Updated', rows[0]['title'])
501 self.assertIn('New Author', rows[0]['author'])
504if __name__ == '__main__':
505 unittest.main(verbosity=2)