Coverage for test/merge_csv_dumps_test.py: 98%
549 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1import csv
2import os
3import sys
4import tempfile
5import unittest
6from unittest.mock import Mock, patch
8sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
10from oc_meta.run.merge_csv_dumps import (CSVDumpMerger, _process_new_file,
11 _process_single_file,
12 get_existing_output_files,
13 normalize_ids_in_brackets,
14 normalize_ids_in_field,
15 normalize_page_field,
16 normalize_people_field,
17 postprocess_type,
18 process_ordered_list)
21class TestNormalizationFunctions(unittest.TestCase):
22 """Test the ID normalization functions"""
24 def test_normalize_ids_in_field(self):
25 """Test normalization of space-separated ID lists"""
26 test_cases = [
27 # Basic case: OMID should come first, others alphabetically
28 ("doi:10.1000/123 pmid:456 omid:br/789", "omid:br/789 doi:10.1000/123 pmid:456"),
29 # Multiple OMIDs (should be sorted among themselves too)
30 ("pmid:456 omid:br/789 omid:br/123 doi:10.1000/abc", "omid:br/123 omid:br/789 doi:10.1000/abc pmid:456"),
31 # No OMID
32 ("doi:10.1000/123 pmid:456", "doi:10.1000/123 pmid:456"),
33 # Empty string
34 ("", ""),
35 # None input
36 (None, ""),
37 ]
39 for input_val, expected in test_cases:
40 with self.subTest(input_val=input_val):
41 result = normalize_ids_in_field(input_val)
42 self.assertEqual(result, expected)
44 def test_normalize_ids_in_brackets(self):
45 """Test normalization of IDs within square brackets"""
46 test_cases = [
47 # Basic case with person name
48 ("John Doe [doi:10.1000/123 omid:ra/456]", "John Doe [omid:ra/456 doi:10.1000/123]"),
49 # Multiple brackets in same string
50 ("John [doi:123 omid:ra/456] and Jane [pmid:789 omid:ra/abc]",
51 "John [omid:ra/456 doi:123] and Jane [omid:ra/abc pmid:789]"),
52 # Empty brackets
53 ("Name []", "Name []"),
54 # No brackets
55 ("John Doe", "John Doe"),
56 ]
58 for input_val, expected in test_cases:
59 with self.subTest(input_val=input_val):
60 result = normalize_ids_in_brackets(input_val)
61 self.assertEqual(result, expected)
63 def test_normalize_people_field(self):
64 """Test normalization of people fields (author, editor, publisher)"""
65 test_cases = [
66 # Single person
67 ("John Doe [doi:10.1000/123 omid:ra/456]", "John Doe [omid:ra/456 doi:10.1000/123]"),
68 # Multiple people (order should be preserved, but IDs normalized)
69 ("Smith, John [orcid:0000-0000-0000-0000 omid:ra/123]; Doe, Jane [omid:ra/456 doi:10.1000/abc]",
70 "Smith, John [omid:ra/123 orcid:0000-0000-0000-0000]; Doe, Jane [omid:ra/456 doi:10.1000/abc]"),
71 # Empty field
72 ("", ""),
73 ]
75 for input_val, expected in test_cases:
76 with self.subTest(input_val=input_val):
77 result = normalize_people_field(input_val)
78 self.assertEqual(result, expected)
80 def test_normalize_page_field(self):
81 """Test normalization of page fields"""
82 test_cases = [
83 # Identical start and end pages should be simplified
84 ("333-333", "333"),
85 ("1-1", "1"),
86 # Different start and end pages should remain unchanged
87 ("333-334", "333-334"),
88 ("1-10", "1-10"),
89 # Single pages should remain unchanged
90 ("333", "333"),
91 # Empty or None should return empty
92 ("", ""),
93 (None, ""),
94 ]
96 for input_val, expected in test_cases:
97 with self.subTest(input_val=input_val):
98 result = normalize_page_field(input_val)
99 self.assertEqual(result, expected)
102class TestCSVDumpMerger(unittest.TestCase):
103 """Test CSVDumpMerger class methods"""
105 def setUp(self):
106 self.merger = CSVDumpMerger("http://example.com/sparql")
108 def test_extract_omid_from_id_field(self):
109 """Test extraction of OMID from ID field"""
110 test_cases = [
111 ("doi:10.1007/978-3-662-07918-8_3 omid:br/0612345", "omid:br/0612345"),
112 ("omid:br/0612345 doi:10.1007/978-3-662-07918-8_3", "omid:br/0612345"),
113 ("doi:10.1007/978-3-662-07918-8_3", None),
114 ("omid:br/0612345", "omid:br/0612345"),
115 ("", None),
116 (None, None),
117 ]
119 for id_field, expected in test_cases:
120 with self.subTest(id_field=id_field):
121 result = self.merger.extract_omid_from_id_field(id_field)
122 self.assertEqual(result, expected)
124 def test_build_sparql_query(self):
125 """Test SPARQL query building with OMID values"""
126 omids = ["omid:br/0612345", "omid:br/0612346"]
127 query = self.merger.build_sparql_query(omids)
129 self.assertIn("VALUES ?res", query)
130 self.assertIn("<https://w3id.org/oc/meta/br/0612345>", query)
131 self.assertIn("<https://w3id.org/oc/meta/br/0612346>", query)
132 self.assertIn("PREFIX foaf:", query)
133 self.assertIn("SELECT DISTINCT", query)
135 def test_normalize_row_data(self):
136 """Test row data normalization with ID ordering"""
137 test_row = {
138 "id": " doi:10.1000/123 omid:br/0612345 ",
139 "title": "Test Title ",
140 "author": "John Doe [doi:456 omid:ra/789]; Jane Smith [omid:ra/abc]",
141 "pub_date": "2023",
142 "venue": "Journal [issn:1234 omid:br/journal]",
143 "publisher": "Publisher [crossref:123 omid:ra/pub]",
144 "page": "333-333"
145 }
147 normalized = self.merger.normalize_row_data(test_row)
149 # ID field should have OMID first
150 self.assertEqual(normalized["id"], "omid:br/0612345 doi:10.1000/123")
151 self.assertEqual(normalized["title"], "Test Title")
152 # Author field should have normalized IDs in brackets but preserve people order
153 self.assertEqual(normalized["author"], "John Doe [omid:ra/789 doi:456]; Jane Smith [omid:ra/abc]")
154 self.assertEqual(normalized["pub_date"], "2023")
155 # Venue and publisher should have normalized IDs
156 self.assertEqual(normalized["venue"], "Journal [omid:br/journal issn:1234]")
157 self.assertEqual(normalized["publisher"], "Publisher [omid:ra/pub crossref:123]")
158 # Page field should be simplified when start and end are identical
159 self.assertEqual(normalized["page"], "333")
161 def test_normalize_row_data_with_none_values(self):
162 """Test row data normalization with None values"""
163 test_row = {
164 "id": None,
165 "title": None,
166 "author": None,
167 "pub_date": None,
168 }
170 normalized = self.merger.normalize_row_data(test_row)
172 for key, value in normalized.items():
173 self.assertEqual(value, "")
175 def test_rows_are_different(self):
176 """Test comparison of rows for differences"""
177 row1 = {
178 "id": "omid:br/0612345",
179 "title": "Original Title",
180 "author": "Author 1",
181 "pub_date": "2023",
182 "venue": "Journal A",
183 "volume": "1",
184 "issue": "1",
185 "page": "1-10",
186 "type": "article",
187 "publisher": "Publisher A",
188 "editor": "Editor A"
189 }
191 row2_same = row1.copy()
192 row2_different = row1.copy()
193 row2_different["title"] = "Different Title"
195 # Test without logging
196 self.assertFalse(self.merger.rows_are_different(row1, row2_same, log_differences=False))
197 self.assertTrue(self.merger.rows_are_different(row1, row2_different, log_differences=False))
199 def test_rows_are_different_id_ordering_only(self):
200 """Test that rows differing only in ID ordering are considered the same"""
201 row1 = {
202 "id": "doi:10.1000/123 omid:br/0612345",
203 "author": "John [doi:456 omid:ra/789]; Jane [omid:ra/abc pmid:999]",
204 "venue": "Journal [issn:1234 omid:br/journal]"
205 }
207 row2 = {
208 "id": "omid:br/0612345 doi:10.1000/123", # Different order
209 "author": "John [omid:ra/789 doi:456]; Jane [pmid:999 omid:ra/abc]", # Different ID order
210 "venue": "Journal [omid:br/journal issn:1234]" # Different ID order
211 }
213 # These should be considered the same after normalization
214 self.assertFalse(self.merger.rows_are_different(row1, row2, log_differences=False))
216 def test_rows_are_different_page_normalization(self):
217 """Test that rows differing only in page format (333-333 vs 333) are considered the same"""
218 row1 = {
219 "id": "omid:br/0612345",
220 "title": "Test Title",
221 "page": "333-333"
222 }
224 row2 = {
225 "id": "omid:br/0612345",
226 "title": "Test Title",
227 "page": "333"
228 }
230 # These should be considered the same after page normalization
231 self.assertFalse(self.merger.rows_are_different(row1, row2, log_differences=False))
233 # But different page ranges should still be detected as different
234 row3 = {
235 "id": "omid:br/0612345",
236 "title": "Test Title",
237 "page": "333-334"
238 }
240 self.assertTrue(self.merger.rows_are_different(row1, row3, log_differences=False))
242 def test_get_all_csv_files(self):
243 """Test getting CSV files from directory"""
244 with tempfile.TemporaryDirectory() as temp_dir:
245 # Create test files
246 csv_files = ["test1.csv", "test2.csv"]
247 other_files = ["test.txt", "test.json"]
249 for filename in csv_files + other_files:
250 filepath = os.path.join(temp_dir, filename)
251 with open(filepath, 'w') as f:
252 f.write("test")
254 result = self.merger.get_all_csv_files(temp_dir)
256 self.assertEqual(len(result), 2)
257 result_basenames = [os.path.basename(f) for f in result]
258 self.assertIn("test1.csv", result_basenames)
259 self.assertIn("test2.csv", result_basenames)
261 def test_get_all_csv_files_nonexistent_dir(self):
262 """Test getting CSV files from non-existent directory"""
263 result = self.merger.get_all_csv_files("/non/existent/path")
264 self.assertEqual(result, [])
266 def test_constructor_parameters(self):
267 """Test CSVDumpMerger constructor parameters"""
268 # Test with all parameters
269 merger1 = CSVDumpMerger("http://example.com/sparql", batch_size=100)
270 self.assertEqual(merger1.endpoint_url, "http://example.com/sparql")
271 self.assertEqual(merger1.batch_size, 100)
273 # Test with defaults
274 merger2 = CSVDumpMerger("http://example.com/sparql")
275 self.assertEqual(merger2.batch_size, 50)
277 # Test with empty endpoint - should raise ValueError
278 with self.assertRaises(ValueError) as context:
279 CSVDumpMerger("")
280 self.assertIn("SPARQL endpoint URL is mandatory", str(context.exception))
282 @patch('oc_meta.run.merge_csv_dumps.SPARQLWrapper')
283 def test_execute_sparql_query(self, mock_sparql_wrapper):
284 """Test SPARQL query execution"""
285 mock_results = {
286 "results": {
287 "bindings": [
288 {
289 "id": {"value": "doi:10.1234 omid:br/0612345"},
290 "title": {"value": "Test Title"},
291 "type": {"value": "http://purl.org/spar/fabio/JournalArticle"},
292 "author": {"value": "Test Author"}
293 }
294 ]
295 }
296 }
298 mock_sparql = Mock()
299 mock_sparql.query.return_value.convert.return_value = mock_results
300 mock_sparql_wrapper.return_value = mock_sparql
302 merger = CSVDumpMerger("http://example.com/sparql")
303 merger.sparql = mock_sparql
305 query = "SELECT * WHERE { ?s ?p ?o }"
306 result = merger.execute_sparql_query(query)
308 self.assertEqual(len(result), 1)
309 self.assertEqual(result[0]["id"], "doi:10.1234 omid:br/0612345")
310 self.assertEqual(result[0]["title"], "Test Title")
311 self.assertEqual(result[0]["type"], "journal article")
312 self.assertEqual(result[0]["author"], "Test Author")
314 @patch.object(CSVDumpMerger, 'execute_sparql_query')
315 def test_verify_file_data(self, mock_execute):
316 """Test verification of file data against database"""
317 omids = ["omid:br/0612345", "omid:br/0612346"]
319 mock_db_results = [
320 {"id": "doi:10.1234 omid:br/0612345", "title": "Updated Title 1"},
321 ]
322 mock_execute.return_value = mock_db_results
324 result, query_failed = self.merger.verify_file_data(omids)
326 self.assertFalse(query_failed)
327 self.assertEqual(len(result), 1)
328 self.assertIn("omid:br/0612345", result)
329 self.assertEqual(result["omid:br/0612345"]["title"], "Updated Title 1")
331 @patch.object(CSVDumpMerger, 'execute_sparql_query')
332 def test_verify_file_data_query_failure(self, mock_execute):
333 """Test verification when all queries fail"""
334 omids = ["omid:br/0612345", "omid:br/0612346"]
336 # Mock query failure
337 mock_execute.return_value = None
339 result, query_failed = self.merger.verify_file_data(omids)
341 self.assertTrue(query_failed)
342 self.assertEqual(len(result), 0)
345class TestGetExistingOutputFiles(unittest.TestCase):
346 """Test the get_existing_output_files function"""
348 def test_get_existing_output_files_empty_dir(self):
349 """Test getting existing files from empty directory"""
350 with tempfile.TemporaryDirectory() as temp_dir:
351 result = get_existing_output_files(temp_dir)
352 self.assertEqual(result, set())
354 def test_get_existing_output_files_nonexistent_dir(self):
355 """Test getting existing files from non-existent directory"""
356 result = get_existing_output_files("/non/existent/path")
357 self.assertEqual(result, set())
359 def test_get_existing_output_files_with_files(self):
360 """Test getting existing files from directory with CSV files"""
361 with tempfile.TemporaryDirectory() as temp_dir:
362 # Create test files
363 csv_files = ["test1.csv", "test2.csv"]
364 other_files = ["test.txt", "test.json"]
366 for filename in csv_files + other_files:
367 filepath = os.path.join(temp_dir, filename)
368 with open(filepath, 'w') as f:
369 f.write("test")
371 result = get_existing_output_files(temp_dir)
373 self.assertEqual(result, {"test1.csv", "test2.csv"})
376class TestProcessNewFile(unittest.TestCase):
377 """Test the _process_new_file function"""
379 def setUp(self):
380 self.temp_dir = tempfile.TemporaryDirectory()
381 self.output_dir = os.path.join(self.temp_dir.name, "output")
382 os.makedirs(self.output_dir)
384 def tearDown(self):
385 self.temp_dir.cleanup()
387 def create_test_csv(self, filename, data):
388 """Helper method to create test CSV files"""
389 filepath = os.path.join(self.temp_dir.name, filename)
390 if data:
391 with open(filepath, 'w', newline='', encoding='utf-8') as f:
392 writer = csv.DictWriter(f, fieldnames=data[0].keys())
393 writer.writeheader()
394 writer.writerows(data)
395 return filepath
397 @patch('oc_meta.run.merge_csv_dumps.get_csv_data_fast')
398 def test_process_new_file_empty(self, mock_get_csv):
399 """Test processing empty new file"""
400 mock_get_csv.return_value = []
402 filepath = os.path.join(self.temp_dir.name, "empty.csv")
403 args = (filepath, self.output_dir, "http://example.com/sparql", set()) # No existing files
405 result = _process_new_file(args)
406 output_file, row_count, updated_count, filename, file_omids, skipped = result
408 self.assertIsNone(output_file)
409 self.assertEqual(row_count, 0)
410 self.assertEqual(updated_count, 0)
411 self.assertEqual(filename, "empty.csv")
412 self.assertEqual(file_omids, set())
413 self.assertFalse(skipped)
415 @patch('oc_meta.run.merge_csv_dumps.get_csv_data_fast')
416 def test_process_new_file_with_omids(self, mock_get_csv):
417 """Test processing new file with OMIDs (collects OMIDs, normalizes data)"""
418 test_data = [
419 {
420 "id": "doi:10.1000/123 omid:br/0612345",
421 "title": "Title 1 ", # Extra spaces to test normalization
422 "author": "Author [doi:456 omid:ra/789]", # Test ID normalization
423 "page": "333-333" # Test page normalization
424 },
425 {
426 "id": "pmid:456 omid:br/0612346",
427 "title": "Title 2",
428 "author": "Author 2 [omid:ra/abc]"
429 }
430 ]
432 mock_get_csv.return_value = test_data
434 filepath = self.create_test_csv("new.csv", test_data)
435 args = (filepath, self.output_dir, "http://example.com/sparql", set()) # No existing files
437 result = _process_new_file(args)
438 output_file, row_count, updated_count, filename, file_omids, skipped = result
440 self.assertIsNotNone(output_file)
441 self.assertEqual(row_count, 2)
442 self.assertEqual(updated_count, 0) # No database updates for new files
443 self.assertEqual(filename, "new.csv")
444 self.assertEqual(file_omids, {"omid:br/0612345", "omid:br/0612346"})
445 self.assertFalse(skipped)
447 # Check output file content for normalization
448 with open(output_file, 'r', encoding='utf-8') as f:
449 reader = csv.DictReader(f)
450 output_rows = list(reader)
452 self.assertEqual(len(output_rows), 2)
453 # Check ID normalization (OMID first)
454 self.assertEqual(output_rows[0]["id"], "omid:br/0612345 doi:10.1000/123")
455 self.assertEqual(output_rows[1]["id"], "omid:br/0612346 pmid:456")
456 # Check title normalization (trimmed spaces)
457 self.assertEqual(output_rows[0]["title"], "Title 1")
458 # Check author ID normalization
459 self.assertEqual(output_rows[0]["author"], "Author [omid:ra/789 doi:456]")
460 # Check page normalization
461 self.assertEqual(output_rows[0]["page"], "333")
463 @patch('oc_meta.run.merge_csv_dumps.get_csv_data_fast')
464 def test_process_new_file_without_omids(self, mock_get_csv):
465 """Test processing new file without OMIDs"""
466 test_data = [
467 {"id": "doi:10.1000/123", "title": "Title 1", "author": "Author 1"},
468 {"id": "pmid:456", "title": "Title 2", "author": "Author 2"}
469 ]
471 mock_get_csv.return_value = test_data
473 filepath = self.create_test_csv("new.csv", test_data)
474 args = (filepath, self.output_dir, "http://example.com/sparql", set()) # No existing files
476 result = _process_new_file(args)
477 output_file, row_count, updated_count, filename, file_omids, skipped = result
479 # Since rows without OMID are now skipped, we expect no output
480 self.assertIsNone(output_file)
481 self.assertEqual(row_count, 0)
482 self.assertEqual(updated_count, 0)
483 self.assertEqual(filename, "new.csv")
484 self.assertEqual(file_omids, set()) # No OMIDs found
485 self.assertFalse(skipped)
487 @patch('oc_meta.run.merge_csv_dumps.get_csv_data_fast')
488 def test_process_new_file_cached_empty(self, mock_get_csv):
489 """Test processing empty cached new file"""
490 # Mock empty file
491 mock_get_csv.return_value = []
493 filepath = os.path.join(self.temp_dir.name, "cached.csv")
494 existing_files = {"cached.csv"} # File already exists
495 args = (filepath, self.output_dir, "http://example.com/sparql", existing_files)
497 result = _process_new_file(args)
498 output_file, row_count, updated_count, filename, file_omids, skipped = result
500 # Empty file returns None even if cached
501 self.assertIsNone(output_file)
502 self.assertEqual(row_count, 0)
503 self.assertEqual(updated_count, 0)
504 self.assertEqual(filename, "cached.csv")
505 self.assertEqual(file_omids, set()) # Empty because file is empty
506 self.assertFalse(skipped) # Not actually skipped, just empty
508 @patch('oc_meta.run.merge_csv_dumps.get_csv_data_fast')
509 def test_process_new_file_cached_extracts_omids(self, mock_get_csv):
510 """Test that cached new file still extracts OMIDs for exclusion"""
511 test_data = [
512 {
513 "id": "doi:10.1000/123 omid:br/0612345",
514 "title": "Title 1",
515 "author": "Author 1"
516 },
517 {
518 "id": "pmid:456 omid:br/0612346",
519 "title": "Title 2",
520 "author": "Author 2"
521 }
522 ]
524 mock_get_csv.return_value = test_data
526 filepath = os.path.join(self.temp_dir.name, "cached.csv")
527 existing_files = {"cached.csv"} # File already exists (cached)
528 args = (filepath, self.output_dir, "http://example.com/sparql", existing_files)
530 result = _process_new_file(args)
531 output_file, row_count, updated_count, filename, file_omids, skipped = result
533 # Should be skipped due to cache but OMIDs should still be extracted
534 self.assertEqual(output_file, "cached.csv") # Returns filename when skipped
535 self.assertEqual(row_count, 0) # No rows written (cached)
536 self.assertEqual(updated_count, 0)
537 self.assertEqual(filename, "cached.csv")
538 self.assertEqual(file_omids, {"omid:br/0612345", "omid:br/0612346"}) # OMIDs should be extracted!
539 self.assertTrue(skipped)
542class TestProcessSingleFile(unittest.TestCase):
543 """Test the _process_single_file function"""
545 def setUp(self):
546 self.temp_dir = tempfile.TemporaryDirectory()
547 self.output_dir = os.path.join(self.temp_dir.name, "output")
548 os.makedirs(self.output_dir)
550 def tearDown(self):
551 self.temp_dir.cleanup()
553 def create_test_csv(self, filename, data):
554 """Helper method to create test CSV files"""
555 filepath = os.path.join(self.temp_dir.name, filename)
556 if data:
557 with open(filepath, 'w', newline='', encoding='utf-8') as f:
558 writer = csv.DictWriter(f, fieldnames=data[0].keys())
559 writer.writeheader()
560 writer.writerows(data)
561 return filepath
563 @patch('oc_meta.run.merge_csv_dumps.get_csv_data_fast')
564 def test_process_single_file_empty(self, mock_get_csv):
565 """Test processing empty file"""
566 mock_get_csv.return_value = []
568 filepath = os.path.join(self.temp_dir.name, "empty.csv")
569 args = (filepath, self.output_dir, "http://example.com/sparql", 50, False, set(), set()) # No existing files
571 result = _process_single_file(args)
572 self.assertEqual(result, (None, 0, 0, "empty.csv", False))
574 @patch('oc_meta.run.merge_csv_dumps.get_csv_data_fast')
575 @patch.object(CSVDumpMerger, 'verify_file_data')
576 def test_process_single_file_with_updates(self, mock_verify, mock_get_csv):
577 """Test processing file with database updates"""
578 test_data = [
579 {"id": "omid:br/0612345", "title": "Old Title", "author": "Author 1"}
580 ]
582 mock_get_csv.return_value = test_data
583 mock_verify.return_value = ({
584 "omid:br/0612345": {"id": "omid:br/0612345", "title": "New Title", "author": "Author 1"}
585 }, False) # Database results, no query failure
587 filepath = self.create_test_csv("test.csv", test_data)
588 excluded_omids = set()
589 args = (filepath, self.output_dir, "http://example.com/sparql", 50, False, excluded_omids, set()) # No existing files
591 result = _process_single_file(args)
592 output_file, row_count, updated_count, filename, skipped = result
594 self.assertIsNotNone(output_file)
595 self.assertEqual(row_count, 1)
596 self.assertEqual(updated_count, 1)
597 self.assertEqual(filename, "test.csv")
598 self.assertFalse(skipped)
600 # Check output file content - should have updated title
601 with open(output_file, 'r', encoding='utf-8') as f:
602 reader = csv.DictReader(f)
603 output_rows = list(reader)
605 self.assertEqual(len(output_rows), 1)
606 self.assertEqual(output_rows[0]["title"], "New Title")
608 def test_process_single_file_cached(self):
609 """Test processing single file that already exists in output (cached)"""
610 filepath = os.path.join(self.temp_dir.name, "cached.csv")
611 # Create a dummy file (doesn't matter what's in it for this test)
612 with open(filepath, 'w') as f:
613 f.write("dummy")
615 existing_files = {"cached.csv"} # File already exists
616 args = (filepath, self.output_dir, "http://example.com/sparql", 50, False, set(), existing_files)
618 result = _process_single_file(args)
619 output_file, row_count, updated_count, filename, skipped = result
621 # Should be skipped due to cache
622 self.assertEqual(output_file, "cached.csv") # Returns filename when skipped
623 self.assertEqual(row_count, 0)
624 self.assertEqual(updated_count, 0)
625 self.assertEqual(filename, "cached.csv")
626 self.assertTrue(skipped)
629class TestCSVDumpMergerIntegration(unittest.TestCase):
630 """Integration tests using temporary files and mock SPARQL endpoint"""
632 def setUp(self):
633 self.temp_dir = tempfile.TemporaryDirectory()
634 self.existing_dir = os.path.join(self.temp_dir.name, "existing")
635 self.new_dir = os.path.join(self.temp_dir.name, "new")
636 self.output_dir = os.path.join(self.temp_dir.name, "output")
638 os.makedirs(self.existing_dir)
639 os.makedirs(self.new_dir)
640 os.makedirs(self.output_dir)
642 def tearDown(self):
643 self.temp_dir.cleanup()
645 def create_test_csv(self, directory, filename, data):
646 """Helper method to create test CSV files"""
647 filepath = os.path.join(directory, filename)
648 if data:
649 with open(filepath, 'w', newline='', encoding='utf-8') as f:
650 writer = csv.DictWriter(f, fieldnames=data[0].keys())
651 writer.writeheader()
652 writer.writerows(data)
653 return filepath
655 @patch.object(CSVDumpMerger, 'execute_sparql_query')
656 def test_complete_merge_workflow(self, mock_execute):
657 """Test complete merge workflow with file I/O"""
658 existing_data = [
659 {
660 "id": "doi:10.1234 omid:br/0612345",
661 "title": "Original Title",
662 "author": "Author 1",
663 "pub_date": "2023",
664 "venue": "Journal A",
665 "volume": "1",
666 "issue": "1",
667 "page": "1-10",
668 "type": "article",
669 "publisher": "Publisher A",
670 "editor": "Editor A"
671 }
672 ]
674 new_data = [
675 {
676 "id": "doi:10.5678 omid:br/0612346",
677 "title": "New Article",
678 "author": "Author 2",
679 "pub_date": "2024",
680 "venue": "Journal B",
681 "volume": "2",
682 "issue": "1",
683 "page": "11-20",
684 "type": "article",
685 "publisher": "Publisher B",
686 "editor": "Editor B"
687 }
688 ]
690 mock_db_results = [
691 {
692 "id": "doi:10.1234 updated_doi:10.1234-v2 omid:br/0612345",
693 "title": "Updated Title from DB",
694 "author": "Author 1",
695 "pub_date": "2023",
696 "venue": "Journal A",
697 "volume": "1",
698 "issue": "1",
699 "page": "1-10",
700 "type": "article",
701 "publisher": "Publisher A",
702 "editor": "Editor A"
703 }
704 ]
706 mock_execute.return_value = mock_db_results
708 self.create_test_csv(self.existing_dir, "existing.csv", existing_data)
709 self.create_test_csv(self.new_dir, "new.csv", new_data)
711 merger = CSVDumpMerger("http://example.com/sparql", batch_size=10)
712 merger.merge_dumps(self.existing_dir, self.new_dir, self.output_dir, max_workers=1, verbose_diff=False)
714 # Check that output files are created
715 output_files = [f for f in os.listdir(self.output_dir) if f.endswith('.csv')]
716 self.assertEqual(len(output_files), 2) # One for each input file
718 # Check existing file output
719 existing_output = os.path.join(self.output_dir, "existing.csv")
720 self.assertTrue(os.path.exists(existing_output))
722 with open(existing_output, 'r', encoding='utf-8') as f:
723 reader = csv.DictReader(f)
724 existing_rows = list(reader)
726 self.assertEqual(len(existing_rows), 1)
727 self.assertEqual(existing_rows[0]['title'], 'Updated Title from DB')
728 self.assertIn('updated_doi:10.1234-v2', existing_rows[0]['id'])
730 # Check new file output - should be normalized but not verified against database
731 new_output = os.path.join(self.output_dir, "new.csv")
732 self.assertTrue(os.path.exists(new_output))
734 with open(new_output, 'r', encoding='utf-8') as f:
735 reader = csv.DictReader(f)
736 new_rows = list(reader)
738 self.assertEqual(len(new_rows), 1)
739 self.assertEqual(new_rows[0]['title'], 'New Article')
740 # Check that new file IDs are normalized (OMID first)
741 self.assertEqual(new_rows[0]['id'], 'omid:br/0612346 doi:10.5678')
743 @patch.object(CSVDumpMerger, 'execute_sparql_query')
744 def test_merge_dumps_with_caching(self, mock_execute):
745 """Test that files are skipped when they already exist in output directory"""
746 existing_data = [
747 {"id": "omid:br/0612345", "title": "Title 1", "author": "Author 1"}
748 ]
750 new_data = [
751 {"id": "omid:br/0612346", "title": "Title 2", "author": "Author 2"}
752 ]
754 # Mock SPARQL query results
755 mock_execute.return_value = [
756 {"id": "omid:br/0612345", "title": "Title 1", "author": "Author 1"}
757 ]
759 # Create input files
760 self.create_test_csv(self.existing_dir, "existing.csv", existing_data)
761 self.create_test_csv(self.new_dir, "new.csv", new_data)
763 # Pre-create output files to simulate cache
764 self.create_test_csv(self.output_dir, "existing.csv", [{"id": "cached", "title": "Cached"}])
765 self.create_test_csv(self.output_dir, "new.csv", [{"id": "cached", "title": "Cached"}])
767 merger = CSVDumpMerger("http://example.com/sparql")
768 merger.merge_dumps(self.existing_dir, self.new_dir, self.output_dir, max_workers=1, verbose_diff=False)
770 # Files should remain unchanged (cached versions)
771 existing_output = os.path.join(self.output_dir, "existing.csv")
772 with open(existing_output, 'r', encoding='utf-8') as f:
773 reader = csv.DictReader(f)
774 existing_rows = list(reader)
776 self.assertEqual(len(existing_rows), 1)
777 self.assertEqual(existing_rows[0]['title'], 'Cached') # Should remain cached version
779 @patch.object(CSVDumpMerger, 'execute_sparql_query')
780 def test_merge_dumps_query_failure(self, mock_execute):
781 """Test merge_dumps when SPARQL queries fail"""
782 existing_data = [
783 {"id": "omid:br/0612345", "title": "Title 1", "author": "Author 1"}
784 ]
786 self.create_test_csv(self.existing_dir, "existing.csv", existing_data)
788 # Mock query failure
789 mock_execute.return_value = None
791 merger = CSVDumpMerger("http://example.com/sparql")
792 merger.merge_dumps(self.existing_dir, self.new_dir, self.output_dir, max_workers=1, verbose_diff=False)
794 # File should be skipped due to query failure, no output file created
795 output_files = [f for f in os.listdir(self.output_dir) if f.endswith('.csv')]
796 self.assertEqual(len(output_files), 0)
798 @patch.object(CSVDumpMerger, 'execute_sparql_query')
799 def test_merge_dumps_omid_exclusion(self, mock_execute):
800 """Test that OMIDs from new files are excluded from existing files"""
801 # Both files contain the same OMID - new file should take precedence
802 existing_data = [
803 {
804 "id": "doi:10.1234 omid:br/0612345",
805 "title": "Old Version",
806 "author": "Author 1"
807 },
808 {
809 "id": "doi:10.5678 omid:br/0612346",
810 "title": "Only in Existing",
811 "author": "Author 2"
812 }
813 ]
815 new_data = [
816 {
817 "id": "pmid:999 omid:br/0612345", # Same OMID as in existing
818 "title": "New Version",
819 "author": "Author 1 Updated"
820 }
821 ]
823 # Mock database would return updated data for OMID 0612346 only
824 mock_db_results = [
825 {
826 "id": "doi:10.5678 updated_doi:10.5678-v2 omid:br/0612346",
827 "title": "Updated from DB",
828 "author": "Author 2"
829 }
830 ]
832 mock_execute.return_value = mock_db_results
834 self.create_test_csv(self.existing_dir, "existing.csv", existing_data)
835 self.create_test_csv(self.new_dir, "new.csv", new_data)
837 merger = CSVDumpMerger("http://example.com/sparql", batch_size=10)
838 merger.merge_dumps(self.existing_dir, self.new_dir, self.output_dir, max_workers=1, verbose_diff=False)
840 # Check new file output - should contain the new version
841 new_output = os.path.join(self.output_dir, "new.csv")
842 with open(new_output, 'r', encoding='utf-8') as f:
843 reader = csv.DictReader(f)
844 new_rows = list(reader)
846 self.assertEqual(len(new_rows), 1)
847 self.assertEqual(new_rows[0]['title'], 'New Version')
848 self.assertEqual(new_rows[0]['id'], 'omid:br/0612345 pmid:999') # Normalized
850 # Check existing file output - should only contain OMID 0612346 (0612345 excluded)
851 existing_output = os.path.join(self.output_dir, "existing.csv")
852 with open(existing_output, 'r', encoding='utf-8') as f:
853 reader = csv.DictReader(f)
854 existing_rows = list(reader)
856 self.assertEqual(len(existing_rows), 1) # Only one row (the excluded OMID was filtered out)
857 self.assertEqual(existing_rows[0]['title'], 'Updated from DB')
858 self.assertIn('omid:br/0612346', existing_rows[0]['id'])
859 self.assertNotIn('omid:br/0612345', existing_rows[0]['id']) # Should not contain excluded OMID
861 @patch.object(CSVDumpMerger, 'execute_sparql_query')
862 def test_merge_dumps_cached_new_files_still_exclude_omids(self, mock_execute):
863 """Test that OMIDs from cached new files are still excluded from existing files"""
864 # This is the critical test: even if new files are skipped due to caching,
865 # their OMIDs should still be extracted and excluded from existing files
867 existing_data = [
868 {
869 "id": "doi:10.1234 omid:br/0612345",
870 "title": "Existing Version",
871 "author": "Author 1"
872 },
873 {
874 "id": "doi:10.5678 omid:br/0612346",
875 "title": "Only in Existing",
876 "author": "Author 2"
877 }
878 ]
880 new_data = [
881 {
882 "id": "pmid:999 omid:br/0612345", # Same OMID as in existing
883 "title": "Cached New Version",
884 "author": "Author 1 Updated"
885 }
886 ]
888 # Create input files
889 self.create_test_csv(self.existing_dir, "existing.csv", existing_data)
890 self.create_test_csv(self.new_dir, "new.csv", new_data)
892 # Pre-create the new output file to simulate cache
893 self.create_test_csv(self.output_dir, "new.csv", [{"id": "cached", "title": "Cached"}])
895 # Mock database would return data for both OMIDs, but 0612345 should be excluded
896 mock_db_results = [
897 {
898 "id": "doi:10.5678 updated_doi:10.5678-v2 omid:br/0612346",
899 "title": "Updated from DB",
900 "author": "Author 2"
901 }
902 ]
904 mock_execute.return_value = mock_db_results
906 merger = CSVDumpMerger("http://example.com/sparql", batch_size=10)
907 merger.merge_dumps(self.existing_dir, self.new_dir, self.output_dir, max_workers=1, verbose_diff=False)
909 # Check that new file remains cached
910 new_output = os.path.join(self.output_dir, "new.csv")
911 with open(new_output, 'r', encoding='utf-8') as f:
912 reader = csv.DictReader(f)
913 new_rows = list(reader)
915 self.assertEqual(len(new_rows), 1)
916 self.assertEqual(new_rows[0]['title'], 'Cached') # Should remain cached version
918 # Check existing file output - should only contain OMID 0612346 (0612345 excluded even though new file was cached)
919 existing_output = os.path.join(self.output_dir, "existing.csv")
920 with open(existing_output, 'r', encoding='utf-8') as f:
921 reader = csv.DictReader(f)
922 existing_rows = list(reader)
924 self.assertEqual(len(existing_rows), 1) # Only one row (the excluded OMID was filtered out)
925 self.assertEqual(existing_rows[0]['title'], 'Updated from DB')
926 self.assertIn('omid:br/0612346', existing_rows[0]['id'])
927 self.assertNotIn('omid:br/0612345', existing_rows[0]['id']) # Should not contain excluded OMID
929 def test_complete_file_based_caching_with_omid_exclusion(self):
930 """Test complete workflow using real files to verify OMID exclusion works with caching"""
931 # This test uses actual file I/O without mocking to test the complete behavior
933 existing_data = [
934 {
935 "id": "doi:10.1234 omid:br/0612345",
936 "title": "Existing Version",
937 "author": "Author 1",
938 "pub_date": "2023",
939 "venue": "",
940 "volume": "",
941 "issue": "",
942 "page": "",
943 "type": "",
944 "publisher": "",
945 "editor": ""
946 },
947 {
948 "id": "doi:10.5678 omid:br/0612346",
949 "title": "Only in Existing",
950 "author": "Author 2",
951 "pub_date": "2024",
952 "venue": "",
953 "volume": "",
954 "issue": "",
955 "page": "",
956 "type": "",
957 "publisher": "",
958 "editor": ""
959 }
960 ]
962 new_data = [
963 {
964 "id": "pmid:999 omid:br/0612345", # Same OMID as in existing
965 "title": "New Version",
966 "author": "Author 1 Updated",
967 "pub_date": "2024",
968 "venue": "",
969 "volume": "",
970 "issue": "",
971 "page": "",
972 "type": "",
973 "publisher": "",
974 "editor": ""
975 }
976 ]
978 # Create input files
979 self.create_test_csv(self.existing_dir, "existing.csv", existing_data)
980 self.create_test_csv(self.new_dir, "new.csv", new_data)
982 # Pre-create the new output file to simulate it's already been processed (cached)
983 cached_new_data = [
984 {
985 "id": "pmid:999 omid:br/0612345",
986 "title": "Previously Processed New Version",
987 "author": "Author 1 Cached",
988 "pub_date": "2024",
989 "venue": "",
990 "volume": "",
991 "issue": "",
992 "page": "",
993 "type": "",
994 "publisher": "",
995 "editor": ""
996 }
997 ]
998 self.create_test_csv(self.output_dir, "new.csv", cached_new_data)
1000 # Create a mock merger that simulates empty SPARQL results (no data found)
1001 # This simulates that omid:br/0612345 is excluded from the query
1002 with patch.object(CSVDumpMerger, 'execute_sparql_query') as mock_execute:
1003 # Return empty results since omid:br/0612345 should be excluded
1004 mock_execute.return_value = []
1006 merger = CSVDumpMerger("http://example.com/sparql", batch_size=10)
1007 merger.merge_dumps(self.existing_dir, self.new_dir, self.output_dir, max_workers=1, verbose_diff=False)
1009 # Check that new file remains cached
1010 new_output = os.path.join(self.output_dir, "new.csv")
1011 with open(new_output, 'r', encoding='utf-8') as f:
1012 reader = csv.DictReader(f)
1013 new_rows = list(reader)
1015 self.assertEqual(len(new_rows), 1)
1016 self.assertEqual(new_rows[0]['title'], 'Previously Processed New Version') # Should remain cached
1018 # Check existing file output - should be empty or have no output file
1019 # because all OMIDs were excluded or no data was found
1020 existing_output = os.path.join(self.output_dir, "existing.csv")
1021 if os.path.exists(existing_output):
1022 with open(existing_output, 'r', encoding='utf-8') as f:
1023 reader = csv.DictReader(f)
1024 existing_rows = list(reader)
1025 # If file exists, it should be empty or contain only omid:br/0612346
1026 # But since we mocked empty results, likely no file was created
1027 self.assertEqual(len(existing_rows), 0)
1028 else:
1029 # No existing output file created because no valid data was found
1030 pass # This is also acceptable behavior
1033class TestPostProcessingFunctions(unittest.TestCase):
1034 """Test utility functions for post-processing"""
1036 def test_postprocess_type(self):
1037 """Test type URI to string conversion"""
1038 test_cases = [
1039 ("http://purl.org/spar/fabio/JournalArticle", "journal article"),
1040 ("http://purl.org/spar/fabio/Book", "book"),
1041 ("http://purl.org/spar/fabio/BookChapter", "book chapter"),
1042 ("http://purl.org/spar/fabio/UnknownType", "http://purl.org/spar/fabio/UnknownType"),
1043 ("", ""),
1044 (None, "")
1045 ]
1047 for type_uri, expected in test_cases:
1048 with self.subTest(type_uri=type_uri):
1049 result = postprocess_type(type_uri)
1050 self.assertEqual(result, expected)
1052 def test_process_ordered_list_empty(self):
1053 """Test process_ordered_list with empty input"""
1054 self.assertEqual(process_ordered_list(""), "")
1055 self.assertEqual(process_ordered_list(None), None)
1057 def test_process_ordered_list_simple(self):
1058 """Test process_ordered_list with simple ordered data"""
1059 # Simple case: Author 1 -> Author 2 -> Author 3
1060 input_data = "Author 1:role1:role2|Author 2:role2:role3|Author 3:role3:"
1061 expected = "Author 1; Author 2; Author 3"
1062 result = process_ordered_list(input_data)
1063 self.assertEqual(result, expected)
1065 def test_process_ordered_list_circular_reference(self):
1066 """Test process_ordered_list with circular references (prevents infinite loop)"""
1067 # Circular case: Author 1 -> Author 2 -> Author 3 -> Author 1
1068 input_data = "Author 1:role1:role2|Author 2:role2:role3|Author 3:role3:role1"
1070 # Should stop at circular reference and only include unique items
1071 with patch('oc_meta.run.merge_csv_dumps.logger') as mock_logger:
1072 result = process_ordered_list(input_data)
1074 # Should have stopped at circular reference
1075 expected = "Author 1; Author 2; Author 3"
1076 self.assertEqual(result, expected)
1078 # Should have logged a warning about circular reference
1079 mock_logger.warning.assert_called_once()
1080 self.assertIn("Circular reference detected", mock_logger.warning.call_args[0][0])
1082 def test_process_ordered_list_long_chain_protection(self):
1083 """Test process_ordered_list with artificially long chain (max iterations protection)"""
1084 # Create a very long chain that would exceed reasonable limits
1085 # Use 100 items with max_iterations = 100 * 2 = 200, so all should be processed
1086 # But we'll mock a smaller max_iterations to trigger the protection
1087 items = []
1088 for i in range(100): # Create 100 items in sequence
1089 next_role = f"role{i+1}" if i < 99 else ""
1090 items.append(f"Author {i}:role{i}:{next_role}")
1092 input_data = "|".join(items)
1094 # Temporarily modify the max_iterations calculation by mocking it
1095 with patch('oc_meta.run.merge_csv_dumps.logger') as mock_logger:
1096 # We'll create a scenario where max_iterations is artificially small
1097 # by patching the logic or creating a controlled test
1099 # Let's create a simpler test: create 10 items but set a very small limit
1100 simple_items = []
1101 for i in range(10):
1102 next_role = f"role{i+1}" if i < 9 else ""
1103 simple_items.append(f"Author {i}:role{i}:{next_role}")
1105 simple_input = "|".join(simple_items)
1107 # Mock the function to have a small max_iterations
1108 original_func = process_ordered_list
1110 def limited_process_ordered_list(items_str):
1111 if not items_str:
1112 return items_str
1113 items_dict = {}
1114 role_to_name = {}
1115 for item in items_str.split('|'):
1116 parts = item.split(':')
1117 if len(parts) >= 3:
1118 name = ':'.join(parts[:-2])
1119 current_role = parts[-2]
1120 next_role = parts[-1] if parts[-1] != '' else None
1121 items_dict[current_role] = next_role
1122 role_to_name[current_role] = name
1124 if not items_dict:
1125 return items_str
1127 ordered_items = []
1128 visited_roles = set()
1129 max_iterations = 5 # Artificially small limit for testing
1131 start_roles = [role for role in items_dict.keys() if role not in items_dict.values()]
1132 if not start_roles:
1133 start_role = next(iter(items_dict.keys()))
1134 else:
1135 start_role = start_roles[0]
1137 current_role = start_role
1138 iteration_count = 0
1140 while current_role and current_role in role_to_name and iteration_count < max_iterations:
1141 if current_role in visited_roles:
1142 mock_logger.warning(f"Circular reference detected in role chain at role: {current_role}")
1143 break
1145 visited_roles.add(current_role)
1146 ordered_items.append(role_to_name[current_role])
1147 current_role = items_dict.get(current_role, '')
1148 iteration_count += 1
1150 if iteration_count >= max_iterations:
1151 mock_logger.warning(f"Maximum iterations reached ({max_iterations}) in process_ordered_list, possible infinite loop prevented")
1153 return "; ".join(ordered_items)
1155 result = limited_process_ordered_list(simple_input)
1157 # Should have stopped due to max iterations limit (5)
1158 result_items = result.split("; ")
1159 self.assertEqual(len(result_items), 5) # Should be limited to 5
1161 # Should have logged a warning about max iterations
1162 mock_logger.warning.assert_called()
1163 warning_calls = [call for call in mock_logger.warning.call_args_list
1164 if "Maximum iterations reached" in str(call)]
1165 self.assertTrue(len(warning_calls) > 0)
1167 def test_process_ordered_list_self_reference(self):
1168 """Test process_ordered_list with immediate self-reference"""
1169 # Self-referencing case: Author 1 -> Author 1
1170 input_data = "Author 1:role1:role1"
1172 with patch('oc_meta.run.merge_csv_dumps.logger') as mock_logger:
1173 result = process_ordered_list(input_data)
1175 # Should include the item once and detect circular reference
1176 expected = "Author 1"
1177 self.assertEqual(result, expected)
1179 # Should have logged a warning about circular reference
1180 mock_logger.warning.assert_called_once()
1181 self.assertIn("Circular reference detected", mock_logger.warning.call_args[0][0])
1183 def test_process_ordered_list_complex_circular(self):
1184 """Test process_ordered_list with complex circular pattern"""
1185 # Complex circular case: A -> B -> C -> D -> B (creates loop at B)
1186 input_data = "Author A:roleA:roleB|Author B:roleB:roleC|Author C:roleC:roleD|Author D:roleD:roleB"
1188 with patch('oc_meta.run.merge_csv_dumps.logger') as mock_logger:
1189 result = process_ordered_list(input_data)
1191 # Should process A -> B -> C -> D and then detect circular reference at B
1192 expected = "Author A; Author B; Author C; Author D"
1193 self.assertEqual(result, expected)
1195 # Should have logged a warning about circular reference
1196 mock_logger.warning.assert_called_once()
1197 self.assertIn("Circular reference detected", mock_logger.warning.call_args[0][0])
1200if __name__ == '__main__':
1201 unittest.main()