Coverage for test/preprocess_input_test.py: 95%
262 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2024 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17import csv
18import os
19import shutil
20import tempfile
21import unittest
22from unittest.mock import MagicMock, patch
24import redis
25from oc_meta.run.meta.preprocess_input import process_csv_file
28class MockSPARQLResponse:
29 def __init__(self, boolean_value):
30 self.boolean_value = boolean_value
32 def convert(self):
33 return {'boolean': self.boolean_value}
35class TestPreprocessInput(unittest.TestCase):
36 def setUp(self):
37 self.test_dir = tempfile.mkdtemp(dir='.')
38 self.output_dir = tempfile.mkdtemp(dir='.')
40 # Create Redis connection for testing (using DB 5)
41 self.redis_client = redis.Redis(host='localhost', port=6379, db=5, decode_responses=True)
43 # Add some test data to Redis
44 self.redis_client.set('doi:10.1007/978-3-662-07918-8_3', '1')
45 self.redis_client.set('doi:10.1016/0021-9991(73)90147-2', '1')
46 self.redis_client.set('doi:10.1109/20.877674', '1')
48 self.sparql_endpoint = "http://example.org/sparql"
50 self.existing_dois_in_sparql = [
51 'doi:10.1007/978-3-662-07918-8_3',
52 'doi:10.1016/0021-9991(73)90147-2',
53 'doi:10.1109/20.877674'
54 ]
56 def tearDown(self):
57 shutil.rmtree(self.test_dir)
58 shutil.rmtree(self.output_dir)
59 self.redis_client.flushdb()
60 self.redis_client.close()
62 def mock_sparql_query(self, endpoint, query, id_str):
63 """Mock for SPARQL query execution - check if ID exists in our test list"""
64 if id_str in self.existing_dois_in_sparql:
65 return MockSPARQLResponse(True)
66 return MockSPARQLResponse(False)
68 def test_process_real_metadata_redis(self):
69 """Test processing metadata with Redis lookup"""
70 real_data_path = os.path.join(self.test_dir, 'real_metadata.csv')
72 # These DOIs exist in our Redis test DB
73 real_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
74"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties, State, and Electrodes on Electric Strength","Ushakov, Vasily Y.","2004","Insulation of High-Voltage Equipment [isbn:9783642058530 isbn:9783662079188]",,,"27-82","book chapter","Springer Science and Business Media LLC [crossref:297]",
75"doi:10.1016/0021-9991(73)90147-2","Flux-corrected transport. I. SHASTA, a fluid transport algorithm that works","Boris, Jay P; Book, David L","1973-01","Journal of Computational Physics [issn:0021-9991]","11","1","38-69","journal article","Elsevier BV [crossref:78]",
76"doi:10.1109/20.877674","An investigation of FEM-FCT method for streamer corona simulation","Woong-Gee Min, ; Hyeong-Seok Kim, ; Seok-Hyun Lee, ; Song-Yop Hahn, ","2000-07","IEEE Transactions on Magnetics [issn:0018-9464]","36","4","1280-1284","journal article","Institute of Electrical and Electronics Engineers (IEEE) [crossref:263]",'''
78 with open(real_data_path, 'w', encoding='utf-8') as f:
79 f.write(real_metadata)
81 next_file_num, stats, pending_rows = process_csv_file(
82 real_data_path,
83 self.output_dir,
84 0,
85 storage_type='redis',
86 storage_reference=self.redis_client
87 )
89 # Since all DOIs exist in Redis, no file should be created
90 output_files = os.listdir(self.output_dir)
91 self.assertEqual(len(output_files), 0)
92 self.assertEqual(next_file_num, 0)
93 self.assertEqual(stats.processed_rows, 0)
94 self.assertEqual(stats.existing_ids_rows, 3) # All 3 rows exist in Redis
95 self.assertEqual(len(pending_rows), 0) # No pending rows
97 @patch('oc_meta.run.meta.preprocess_input.SPARQLWrapper')
98 def test_process_real_metadata_sparql(self, mock_sparql_wrapper):
99 """Test processing metadata with SPARQL lookup"""
100 mock_instance = MagicMock()
101 mock_sparql_wrapper.return_value = mock_instance
103 mock_response = MockSPARQLResponse(True)
104 mock_instance.query.return_value = mock_response
106 real_data_path = os.path.join(self.test_dir, 'real_metadata_sparql.csv')
108 # These DOIs are configured to exist in our mocked SPARQL endpoint
109 real_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
110"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties, State, and Electrodes on Electric Strength","Ushakov, Vasily Y.","2004","Insulation of High-Voltage Equipment [isbn:9783642058530 isbn:9783662079188]",,,"27-82","book chapter","Springer Science and Business Media LLC [crossref:297]",
111"doi:10.1016/0021-9991(73)90147-2","Flux-corrected transport. I. SHASTA, a fluid transport algorithm that works","Boris, Jay P; Book, David L","1973-01","Journal of Computational Physics [issn:0021-9991]","11","1","38-69","journal article","Elsevier BV [crossref:78]",
112"doi:10.1109/20.877674","An investigation of FEM-FCT method for streamer corona simulation","Woong-Gee Min, ; Hyeong-Seok Kim, ; Seok-Hyun Lee, ; Song-Yop Hahn, ","2000-07","IEEE Transactions on Magnetics [issn:0018-9464]","36","4","1280-1284","journal article","Institute of Electrical and Electronics Engineers (IEEE) [crossref:263]",'''
114 with open(real_data_path, 'w', encoding='utf-8') as f:
115 f.write(real_metadata)
117 with patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql') as mock_check:
118 def side_effect(ids, endpoint):
119 if not ids:
120 return False
122 id_list = ids.split()
124 # We'll test both the scheme and value for each ID
125 for id_str in id_list:
126 parts = id_str.split(":", 1)
127 scheme = parts[0]
128 value = parts[1]
130 # Make sure both scheme and value are extracted correctly
131 if scheme != "doi" or not value.startswith("10."):
132 return False
134 # Check if the full ID is in our list of valid IDs
135 if id_str not in self.existing_dois_in_sparql:
136 return False
138 return True
140 mock_check.side_effect = side_effect
142 next_file_num, stats, pending_rows = process_csv_file(
143 real_data_path,
144 self.output_dir,
145 0,
146 storage_type='sparql',
147 storage_reference=self.sparql_endpoint
148 )
150 # Since all DOIs are mocked to exist in SPARQL, no file should be created
151 output_files = os.listdir(self.output_dir)
152 self.assertEqual(len(output_files), 0)
153 self.assertEqual(next_file_num, 0)
154 self.assertEqual(stats.processed_rows, 0)
155 self.assertEqual(stats.existing_ids_rows, 3) # All 3 rows exist in mocked SPARQL
156 self.assertEqual(len(pending_rows), 0) # No pending rows
158 def test_process_mixed_metadata_redis(self):
159 """Test processing metadata with both existing and non-existing DOIs in Redis"""
160 mixed_data_path = os.path.join(self.test_dir, 'mixed_metadata.csv')
162 # Mix of existing DOIs, non-existing DOIs and empty IDs
163 mixed_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
164"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties","Author 1","2004","Venue 1",,,"27-82","book chapter","Publisher 1",
165"","Spatial Distribution of Ion Current","Author 2","2012-01","Venue 2","27","1","380-390","journal article","Publisher 2",
166"doi:10.INVALID/123456789","Invalid DOI","Author 3","1980-01-14","Venue 3","13","1","3-6","journal article","Publisher 3",'''
168 with open(mixed_data_path, 'w', encoding='utf-8') as f:
169 f.write(mixed_metadata)
171 next_file_num, stats, pending_rows = process_csv_file(
172 mixed_data_path,
173 self.output_dir,
174 0,
175 storage_type='redis',
176 storage_reference=self.redis_client
177 )
179 # Write pending rows if any
180 if pending_rows:
181 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv")
182 with open(output_file, 'w', encoding='utf-8', newline='') as out_f:
183 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys())
184 writer.writeheader()
185 writer.writerows(pending_rows)
187 # Should create one file with rows having empty IDs or non-existing DOIs
188 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE
189 output_files = os.listdir(self.output_dir)
190 self.assertEqual(len(output_files), 1)
192 # Verify stats
193 self.assertEqual(stats.total_rows, 3)
194 self.assertEqual(stats.existing_ids_rows, 1) # One existing DOI
195 self.assertEqual(stats.processed_rows, 2) # Two rows should be processed
196 self.assertEqual(len(pending_rows), 2) # Two rows pending
198 output_file = os.path.join(self.output_dir, '0.csv')
199 with open(output_file, 'r', encoding='utf-8') as f:
200 reader = csv.DictReader(f)
201 rows = list(reader)
202 self.assertEqual(len(rows), 2) # Should contain empty ID row and invalid DOI row
203 self.assertTrue(any(row['id'] == '' for row in rows))
204 self.assertTrue(any(row['id'] == 'doi:10.INVALID/123456789' for row in rows))
206 @patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql')
207 def test_process_mixed_metadata_sparql(self, mock_check):
208 """Test processing metadata with both existing and non-existing DOIs in SPARQL"""
209 # Mock the check_ids_existence_sparql function
210 def side_effect(ids, endpoint):
211 if not ids:
212 return False
214 id_list = ids.split()
216 # We'll test both the scheme and value for each ID
217 for id_str in id_list:
218 parts = id_str.split(":", 1)
219 scheme = parts[0]
220 value = parts[1]
222 # Make sure both scheme and value are extracted correctly
223 if scheme != "doi" or not value.startswith("10."):
224 continue
226 # Check if the full ID is in our list of valid IDs
227 if id_str not in self.existing_dois_in_sparql:
228 return False
230 return True
232 mock_check.side_effect = side_effect
234 mixed_data_path = os.path.join(self.test_dir, 'mixed_metadata_sparql.csv')
236 # Mix of existing DOIs, non-existing DOIs and empty IDs
237 mixed_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
238"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties","Author 1","2004","Venue 1",,,"27-82","book chapter","Publisher 1",
239"","Spatial Distribution of Ion Current","Author 2","2012-01","Venue 2","27","1","380-390","journal article","Publisher 2",
240"doi:10.INVALID/123456789","Invalid DOI","Author 3","1980-01-14","Venue 3","13","1","3-6","journal article","Publisher 3",'''
242 with open(mixed_data_path, 'w', encoding='utf-8') as f:
243 f.write(mixed_metadata)
245 next_file_num, stats, pending_rows = process_csv_file(
246 mixed_data_path,
247 self.output_dir,
248 0,
249 storage_type='sparql',
250 storage_reference=self.sparql_endpoint
251 )
253 # Write pending rows if any
254 if pending_rows:
255 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv")
256 with open(output_file, 'w', encoding='utf-8', newline='') as out_f:
257 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys())
258 writer.writeheader()
259 writer.writerows(pending_rows)
261 # Should create one file with rows having empty IDs or non-existing DOIs
262 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE
263 output_files = os.listdir(self.output_dir)
264 self.assertEqual(len(output_files), 1)
266 # Verify stats
267 self.assertEqual(stats.total_rows, 3)
268 self.assertEqual(stats.existing_ids_rows, 1) # One existing DOI
269 self.assertEqual(stats.processed_rows, 2) # Two rows should be processed
270 self.assertEqual(len(pending_rows), 2) # Two rows pending
272 output_file = os.path.join(self.output_dir, '0.csv')
273 with open(output_file, 'r', encoding='utf-8') as f:
274 reader = csv.DictReader(f)
275 rows = list(reader)
276 self.assertEqual(len(rows), 2) # Should contain empty ID row and invalid DOI row
277 self.assertTrue(any(row['id'] == '' for row in rows))
278 self.assertTrue(any(row['id'] == 'doi:10.INVALID/123456789' for row in rows))
280 def test_process_duplicate_rows(self):
281 """Test that duplicate rows are properly filtered out"""
282 test_data_path = os.path.join(self.test_dir, 'duplicate_data.csv')
284 test_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
285"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
286"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
287"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",
288"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
289"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",'''
291 with open(test_data_path, 'w', encoding='utf-8') as f:
292 f.write(test_data)
294 next_file_num, stats, pending_rows = process_csv_file(
295 test_data_path,
296 self.output_dir,
297 0,
298 storage_type='redis',
299 storage_reference=self.redis_client
300 )
302 # Write pending rows if any
303 if pending_rows:
304 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv")
305 with open(output_file, 'w', encoding='utf-8', newline='') as out_f:
306 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys())
307 writer.writeheader()
308 writer.writerows(pending_rows)
310 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE
311 self.assertEqual(stats.total_rows, 5)
312 self.assertEqual(stats.duplicate_rows, 3) # Three duplicate rows
313 self.assertEqual(stats.processed_rows, 2) # Two unique rows processed
314 self.assertEqual(len(pending_rows), 2) # Two rows pending
316 output_files = os.listdir(self.output_dir)
317 self.assertEqual(len(output_files), 1)
319 output_file = os.path.join(self.output_dir, '0.csv')
320 with open(output_file, 'r', encoding='utf-8') as f:
321 reader = csv.DictReader(f)
322 rows = list(reader)
324 self.assertEqual(len(rows), 2)
325 unique_ids = set(row['id'] for row in rows)
326 self.assertEqual(len(unique_ids), 2)
327 self.assertIn('doi:10.INVALID/123', unique_ids)
328 self.assertIn('doi:10.INVALID/456', unique_ids)
330 def test_cross_file_deduplication_redis(self):
331 """Test that duplicate rows are filtered across different files using Redis"""
332 # Create first file with some data
333 file1_path = os.path.join(self.test_dir, 'data1.csv')
334 file1_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
335"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
336"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",'''
338 # Create second file with some duplicates from first file
339 file2_path = os.path.join(self.test_dir, 'data2.csv')
340 file2_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
341"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
342"doi:10.INVALID/789","New Title","New Author","2024","Test Venue","1","1","21-30","journal article","Test Publisher",'''
344 with open(file1_path, 'w', encoding='utf-8') as f:
345 f.write(file1_data)
346 with open(file2_path, 'w', encoding='utf-8') as f:
347 f.write(file2_data)
349 # Process both files using the same seen_rows set and pending_rows list
350 seen_rows = set()
351 pending_rows = []
352 next_file_num, stats1, pending_rows = process_csv_file(
353 file1_path,
354 self.output_dir,
355 0,
356 storage_type='redis',
357 storage_reference=self.redis_client,
358 seen_rows=seen_rows,
359 pending_rows=pending_rows
360 )
361 next_file_num, stats2, pending_rows = process_csv_file(
362 file2_path,
363 self.output_dir,
364 next_file_num,
365 storage_type='redis',
366 storage_reference=self.redis_client,
367 seen_rows=seen_rows,
368 pending_rows=pending_rows
369 )
371 # Write final pending rows
372 if pending_rows:
373 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv")
374 with open(output_file, 'w', encoding='utf-8', newline='') as out_f:
375 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys())
376 writer.writeheader()
377 writer.writerows(pending_rows)
379 # Verify statistics
380 self.assertEqual(stats1.total_rows, 2)
381 self.assertEqual(stats1.duplicate_rows, 0)
382 self.assertEqual(stats1.processed_rows, 2)
384 self.assertEqual(stats2.total_rows, 2)
385 self.assertEqual(stats2.duplicate_rows, 1) # One row should be detected as duplicate
386 self.assertEqual(stats2.processed_rows, 1) # Only one new row should be processed
388 # Check output files
389 output_files = sorted(os.listdir(self.output_dir))
390 self.assertEqual(len(output_files), 1) # Should create only one file
392 # Verify final output contains only unique rows
393 output_file = os.path.join(self.output_dir, '0.csv')
394 with open(output_file, 'r', encoding='utf-8') as f:
395 reader = csv.DictReader(f)
396 rows = list(reader)
397 self.assertEqual(len(rows), 3) # Should have 3 unique rows total
398 unique_ids = set(row['id'] for row in rows)
399 self.assertEqual(len(unique_ids), 3)
400 self.assertIn('doi:10.INVALID/123', unique_ids)
401 self.assertIn('doi:10.INVALID/456', unique_ids)
402 self.assertIn('doi:10.INVALID/789', unique_ids)
404 @patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql')
405 def test_cross_file_deduplication_sparql(self, mock_check):
406 """Test that duplicate rows are filtered across different files using SPARQL"""
407 # Mock the check_ids_existence_sparql function
408 def side_effect(ids, endpoint):
409 if not ids:
410 return False
412 id_list = ids.split()
414 # We'll test both the scheme and value for each ID
415 for id_str in id_list:
416 parts = id_str.split(":", 1)
417 scheme = parts[0]
418 value = parts[1]
420 # Make sure both scheme and value are extracted correctly
421 if scheme != "doi" or not value.startswith("10."):
422 continue
424 # Check if the full ID is in our list of valid IDs
425 if id_str not in self.existing_dois_in_sparql:
426 return False
428 return True
430 mock_check.side_effect = side_effect
432 # Create first file with some data
433 file1_path = os.path.join(self.test_dir, 'data1_sparql.csv')
434 file1_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
435"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
436"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",'''
438 # Create second file with some duplicates from first file
439 file2_path = os.path.join(self.test_dir, 'data2_sparql.csv')
440 file2_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
441"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
442"doi:10.INVALID/789","New Title","New Author","2024","Test Venue","1","1","21-30","journal article","Test Publisher",'''
444 with open(file1_path, 'w', encoding='utf-8') as f:
445 f.write(file1_data)
446 with open(file2_path, 'w', encoding='utf-8') as f:
447 f.write(file2_data)
449 # Process both files using the same seen_rows set and pending_rows list
450 seen_rows = set()
451 pending_rows = []
452 next_file_num, stats1, pending_rows = process_csv_file(
453 file1_path,
454 self.output_dir,
455 0,
456 storage_type='sparql',
457 storage_reference=self.sparql_endpoint,
458 seen_rows=seen_rows,
459 pending_rows=pending_rows
460 )
461 next_file_num, stats2, pending_rows = process_csv_file(
462 file2_path,
463 self.output_dir,
464 next_file_num,
465 storage_type='sparql',
466 storage_reference=self.sparql_endpoint,
467 seen_rows=seen_rows,
468 pending_rows=pending_rows
469 )
471 # Write final pending rows
472 if pending_rows:
473 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv")
474 with open(output_file, 'w', encoding='utf-8', newline='') as out_f:
475 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys())
476 writer.writeheader()
477 writer.writerows(pending_rows)
479 # Verify statistics
480 self.assertEqual(stats1.total_rows, 2)
481 self.assertEqual(stats1.duplicate_rows, 0)
482 self.assertEqual(stats1.processed_rows, 2)
484 self.assertEqual(stats2.total_rows, 2)
485 self.assertEqual(stats2.duplicate_rows, 1) # One row should be detected as duplicate
486 self.assertEqual(stats2.processed_rows, 1) # Only one new row should be processed
488 # Check output files
489 output_files = sorted(os.listdir(self.output_dir))
490 self.assertEqual(len(output_files), 1) # Should create only one file
492 # Verify final output contains only unique rows
493 output_file = os.path.join(self.output_dir, '0.csv')
494 with open(output_file, 'r', encoding='utf-8') as f:
495 reader = csv.DictReader(f)
496 rows = list(reader)
497 self.assertEqual(len(rows), 3) # Should have 3 unique rows total
498 unique_ids = set(row['id'] for row in rows)
499 self.assertEqual(len(unique_ids), 3)
500 self.assertIn('doi:10.INVALID/123', unique_ids)
501 self.assertIn('doi:10.INVALID/456', unique_ids)
502 self.assertIn('doi:10.INVALID/789', unique_ids)
504if __name__ == '__main__':
505 unittest.main()