Coverage for test/preprocess_input_test.py: 95%
257 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2024 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17import csv
18import os
19import shutil
20import tempfile
21import unittest
22from unittest.mock import MagicMock, patch
24import redis
25from oc_meta.run.meta.preprocess_input import process_csv_file
28class MockSPARQLResponse:
29 def __init__(self, boolean_value):
30 self.boolean_value = boolean_value
32 def convert(self):
33 return {'boolean': self.boolean_value}
35class TestPreprocessInput(unittest.TestCase):
36 def setUp(self):
37 self.test_dir = tempfile.mkdtemp(dir='.')
38 self.output_dir = tempfile.mkdtemp(dir='.')
40 self.redis_client = redis.Redis(host='localhost', port=6381, db=5, decode_responses=True)
42 # Add some test data to Redis
43 self.redis_client.set('doi:10.1007/978-3-662-07918-8_3', '1')
44 self.redis_client.set('doi:10.1016/0021-9991(73)90147-2', '1')
45 self.redis_client.set('doi:10.1109/20.877674', '1')
47 self.sparql_endpoint = "http://example.org/sparql"
49 self.existing_dois_in_sparql = [
50 'doi:10.1007/978-3-662-07918-8_3',
51 'doi:10.1016/0021-9991(73)90147-2',
52 'doi:10.1109/20.877674'
53 ]
55 def tearDown(self):
56 shutil.rmtree(self.test_dir)
57 shutil.rmtree(self.output_dir)
58 self.redis_client.flushdb()
59 self.redis_client.close()
61 def mock_sparql_query(self, endpoint, query, id_str):
62 """Mock for SPARQL query execution - check if ID exists in our test list"""
63 if id_str in self.existing_dois_in_sparql:
64 return MockSPARQLResponse(True)
65 return MockSPARQLResponse(False)
67 def test_process_real_metadata_redis(self):
68 """Test processing metadata with Redis lookup"""
69 real_data_path = os.path.join(self.test_dir, 'real_metadata.csv')
71 # These DOIs exist in our Redis test DB
72 real_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
73"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties, State, and Electrodes on Electric Strength","Ushakov, Vasily Y.","2004","Insulation of High-Voltage Equipment [isbn:9783642058530 isbn:9783662079188]",,,"27-82","book chapter","Springer Science and Business Media LLC [crossref:297]",
74"doi:10.1016/0021-9991(73)90147-2","Flux-corrected transport. I. SHASTA, a fluid transport algorithm that works","Boris, Jay P; Book, David L","1973-01","Journal of Computational Physics [issn:0021-9991]","11","1","38-69","journal article","Elsevier BV [crossref:78]",
75"doi:10.1109/20.877674","An investigation of FEM-FCT method for streamer corona simulation","Woong-Gee Min, ; Hyeong-Seok Kim, ; Seok-Hyun Lee, ; Song-Yop Hahn, ","2000-07","IEEE Transactions on Magnetics [issn:0018-9464]","36","4","1280-1284","journal article","Institute of Electrical and Electronics Engineers (IEEE) [crossref:263]",'''
77 with open(real_data_path, 'w', encoding='utf-8') as f:
78 f.write(real_metadata)
80 next_file_num, stats, pending_rows = process_csv_file(
81 real_data_path,
82 self.output_dir,
83 0,
84 storage_type='redis',
85 storage_reference=self.redis_client
86 )
88 # Since all DOIs exist in Redis, no file should be created
89 output_files = os.listdir(self.output_dir)
90 self.assertEqual(len(output_files), 0)
91 self.assertEqual(next_file_num, 0)
92 self.assertEqual(stats.processed_rows, 0)
93 self.assertEqual(stats.existing_ids_rows, 3) # All 3 rows exist in Redis
94 self.assertEqual(len(pending_rows), 0) # No pending rows
96 def test_process_real_metadata_sparql(self):
97 """Test processing metadata with SPARQL lookup"""
98 real_data_path = os.path.join(self.test_dir, 'real_metadata_sparql.csv')
100 # These DOIs are configured to exist in our mocked SPARQL endpoint
101 real_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
102"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties, State, and Electrodes on Electric Strength","Ushakov, Vasily Y.","2004","Insulation of High-Voltage Equipment [isbn:9783642058530 isbn:9783662079188]",,,"27-82","book chapter","Springer Science and Business Media LLC [crossref:297]",
103"doi:10.1016/0021-9991(73)90147-2","Flux-corrected transport. I. SHASTA, a fluid transport algorithm that works","Boris, Jay P; Book, David L","1973-01","Journal of Computational Physics [issn:0021-9991]","11","1","38-69","journal article","Elsevier BV [crossref:78]",
104"doi:10.1109/20.877674","An investigation of FEM-FCT method for streamer corona simulation","Woong-Gee Min, ; Hyeong-Seok Kim, ; Seok-Hyun Lee, ; Song-Yop Hahn, ","2000-07","IEEE Transactions on Magnetics [issn:0018-9464]","36","4","1280-1284","journal article","Institute of Electrical and Electronics Engineers (IEEE) [crossref:263]",'''
106 with open(real_data_path, 'w', encoding='utf-8') as f:
107 f.write(real_metadata)
109 with patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql') as mock_check:
110 def side_effect(ids, endpoint):
111 if not ids:
112 return False
114 id_list = ids.split()
116 # We'll test both the scheme and value for each ID
117 for id_str in id_list:
118 parts = id_str.split(":", 1)
119 scheme = parts[0]
120 value = parts[1]
122 # Make sure both scheme and value are extracted correctly
123 if scheme != "doi" or not value.startswith("10."):
124 return False
126 # Check if the full ID is in our list of valid IDs
127 if id_str not in self.existing_dois_in_sparql:
128 return False
130 return True
132 mock_check.side_effect = side_effect
134 next_file_num, stats, pending_rows = process_csv_file(
135 real_data_path,
136 self.output_dir,
137 0,
138 storage_type='sparql',
139 storage_reference=self.sparql_endpoint
140 )
142 # Since all DOIs are mocked to exist in SPARQL, no file should be created
143 output_files = os.listdir(self.output_dir)
144 self.assertEqual(len(output_files), 0)
145 self.assertEqual(next_file_num, 0)
146 self.assertEqual(stats.processed_rows, 0)
147 self.assertEqual(stats.existing_ids_rows, 3) # All 3 rows exist in mocked SPARQL
148 self.assertEqual(len(pending_rows), 0) # No pending rows
150 def test_process_mixed_metadata_redis(self):
151 """Test processing metadata with both existing and non-existing DOIs in Redis"""
152 mixed_data_path = os.path.join(self.test_dir, 'mixed_metadata.csv')
154 # Mix of existing DOIs, non-existing DOIs and empty IDs
155 mixed_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
156"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties","Author 1","2004","Venue 1",,,"27-82","book chapter","Publisher 1",
157"","Spatial Distribution of Ion Current","Author 2","2012-01","Venue 2","27","1","380-390","journal article","Publisher 2",
158"doi:10.INVALID/123456789","Invalid DOI","Author 3","1980-01-14","Venue 3","13","1","3-6","journal article","Publisher 3",'''
160 with open(mixed_data_path, 'w', encoding='utf-8') as f:
161 f.write(mixed_metadata)
163 next_file_num, stats, pending_rows = process_csv_file(
164 mixed_data_path,
165 self.output_dir,
166 0,
167 storage_type='redis',
168 storage_reference=self.redis_client
169 )
171 # Write pending rows if any
172 if pending_rows:
173 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv")
174 with open(output_file, 'w', encoding='utf-8', newline='') as out_f:
175 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys())
176 writer.writeheader()
177 writer.writerows(pending_rows)
179 # Should create one file with rows having empty IDs or non-existing DOIs
180 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE
181 output_files = os.listdir(self.output_dir)
182 self.assertEqual(len(output_files), 1)
184 # Verify stats
185 self.assertEqual(stats.total_rows, 3)
186 self.assertEqual(stats.existing_ids_rows, 1) # One existing DOI
187 self.assertEqual(stats.processed_rows, 2) # Two rows should be processed
188 self.assertEqual(len(pending_rows), 2) # Two rows pending
190 output_file = os.path.join(self.output_dir, '0.csv')
191 with open(output_file, 'r', encoding='utf-8') as f:
192 reader = csv.DictReader(f)
193 rows = list(reader)
194 self.assertEqual(len(rows), 2) # Should contain empty ID row and invalid DOI row
195 self.assertTrue(any(row['id'] == '' for row in rows))
196 self.assertTrue(any(row['id'] == 'doi:10.INVALID/123456789' for row in rows))
198 @patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql')
199 def test_process_mixed_metadata_sparql(self, mock_check):
200 """Test processing metadata with both existing and non-existing DOIs in SPARQL"""
201 # Mock the check_ids_existence_sparql function
202 def side_effect(ids, endpoint):
203 if not ids:
204 return False
206 id_list = ids.split()
208 # We'll test both the scheme and value for each ID
209 for id_str in id_list:
210 parts = id_str.split(":", 1)
211 scheme = parts[0]
212 value = parts[1]
214 # Make sure both scheme and value are extracted correctly
215 if scheme != "doi" or not value.startswith("10."):
216 continue
218 # Check if the full ID is in our list of valid IDs
219 if id_str not in self.existing_dois_in_sparql:
220 return False
222 return True
224 mock_check.side_effect = side_effect
226 mixed_data_path = os.path.join(self.test_dir, 'mixed_metadata_sparql.csv')
228 # Mix of existing DOIs, non-existing DOIs and empty IDs
229 mixed_metadata = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
230"doi:10.1007/978-3-662-07918-8_3","Influence of Dielectric Properties","Author 1","2004","Venue 1",,,"27-82","book chapter","Publisher 1",
231"","Spatial Distribution of Ion Current","Author 2","2012-01","Venue 2","27","1","380-390","journal article","Publisher 2",
232"doi:10.INVALID/123456789","Invalid DOI","Author 3","1980-01-14","Venue 3","13","1","3-6","journal article","Publisher 3",'''
234 with open(mixed_data_path, 'w', encoding='utf-8') as f:
235 f.write(mixed_metadata)
237 next_file_num, stats, pending_rows = process_csv_file(
238 mixed_data_path,
239 self.output_dir,
240 0,
241 storage_type='sparql',
242 storage_reference=self.sparql_endpoint
243 )
245 # Write pending rows if any
246 if pending_rows:
247 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv")
248 with open(output_file, 'w', encoding='utf-8', newline='') as out_f:
249 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys())
250 writer.writeheader()
251 writer.writerows(pending_rows)
253 # Should create one file with rows having empty IDs or non-existing DOIs
254 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE
255 output_files = os.listdir(self.output_dir)
256 self.assertEqual(len(output_files), 1)
258 # Verify stats
259 self.assertEqual(stats.total_rows, 3)
260 self.assertEqual(stats.existing_ids_rows, 1) # One existing DOI
261 self.assertEqual(stats.processed_rows, 2) # Two rows should be processed
262 self.assertEqual(len(pending_rows), 2) # Two rows pending
264 output_file = os.path.join(self.output_dir, '0.csv')
265 with open(output_file, 'r', encoding='utf-8') as f:
266 reader = csv.DictReader(f)
267 rows = list(reader)
268 self.assertEqual(len(rows), 2) # Should contain empty ID row and invalid DOI row
269 self.assertTrue(any(row['id'] == '' for row in rows))
270 self.assertTrue(any(row['id'] == 'doi:10.INVALID/123456789' for row in rows))
272 def test_process_duplicate_rows(self):
273 """Test that duplicate rows are properly filtered out"""
274 test_data_path = os.path.join(self.test_dir, 'duplicate_data.csv')
276 test_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
277"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
278"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
279"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",
280"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
281"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",'''
283 with open(test_data_path, 'w', encoding='utf-8') as f:
284 f.write(test_data)
286 next_file_num, stats, pending_rows = process_csv_file(
287 test_data_path,
288 self.output_dir,
289 0,
290 storage_type='redis',
291 storage_reference=self.redis_client
292 )
294 # Write pending rows if any
295 if pending_rows:
296 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv")
297 with open(output_file, 'w', encoding='utf-8', newline='') as out_f:
298 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys())
299 writer.writeheader()
300 writer.writerows(pending_rows)
302 self.assertEqual(next_file_num, 0) # File number shouldn't increment until ROWS_PER_FILE
303 self.assertEqual(stats.total_rows, 5)
304 self.assertEqual(stats.duplicate_rows, 3) # Three duplicate rows
305 self.assertEqual(stats.processed_rows, 2) # Two unique rows processed
306 self.assertEqual(len(pending_rows), 2) # Two rows pending
308 output_files = os.listdir(self.output_dir)
309 self.assertEqual(len(output_files), 1)
311 output_file = os.path.join(self.output_dir, '0.csv')
312 with open(output_file, 'r', encoding='utf-8') as f:
313 reader = csv.DictReader(f)
314 rows = list(reader)
316 self.assertEqual(len(rows), 2)
317 unique_ids = set(row['id'] for row in rows)
318 self.assertEqual(len(unique_ids), 2)
319 self.assertIn('doi:10.INVALID/123', unique_ids)
320 self.assertIn('doi:10.INVALID/456', unique_ids)
322 def test_cross_file_deduplication_redis(self):
323 """Test that duplicate rows are filtered across different files using Redis"""
324 # Create first file with some data
325 file1_path = os.path.join(self.test_dir, 'data1.csv')
326 file1_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
327"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
328"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",'''
330 # Create second file with some duplicates from first file
331 file2_path = os.path.join(self.test_dir, 'data2.csv')
332 file2_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
333"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
334"doi:10.INVALID/789","New Title","New Author","2024","Test Venue","1","1","21-30","journal article","Test Publisher",'''
336 with open(file1_path, 'w', encoding='utf-8') as f:
337 f.write(file1_data)
338 with open(file2_path, 'w', encoding='utf-8') as f:
339 f.write(file2_data)
341 # Process both files using the same seen_rows set and pending_rows list
342 seen_rows = set()
343 pending_rows = []
344 next_file_num, stats1, pending_rows = process_csv_file(
345 file1_path,
346 self.output_dir,
347 0,
348 storage_type='redis',
349 storage_reference=self.redis_client,
350 seen_rows=seen_rows,
351 pending_rows=pending_rows
352 )
353 next_file_num, stats2, pending_rows = process_csv_file(
354 file2_path,
355 self.output_dir,
356 next_file_num,
357 storage_type='redis',
358 storage_reference=self.redis_client,
359 seen_rows=seen_rows,
360 pending_rows=pending_rows
361 )
363 # Write final pending rows
364 if pending_rows:
365 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv")
366 with open(output_file, 'w', encoding='utf-8', newline='') as out_f:
367 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys())
368 writer.writeheader()
369 writer.writerows(pending_rows)
371 # Verify statistics
372 self.assertEqual(stats1.total_rows, 2)
373 self.assertEqual(stats1.duplicate_rows, 0)
374 self.assertEqual(stats1.processed_rows, 2)
376 self.assertEqual(stats2.total_rows, 2)
377 self.assertEqual(stats2.duplicate_rows, 1) # One row should be detected as duplicate
378 self.assertEqual(stats2.processed_rows, 1) # Only one new row should be processed
380 # Check output files
381 output_files = sorted(os.listdir(self.output_dir))
382 self.assertEqual(len(output_files), 1) # Should create only one file
384 # Verify final output contains only unique rows
385 output_file = os.path.join(self.output_dir, '0.csv')
386 with open(output_file, 'r', encoding='utf-8') as f:
387 reader = csv.DictReader(f)
388 rows = list(reader)
389 self.assertEqual(len(rows), 3) # Should have 3 unique rows total
390 unique_ids = set(row['id'] for row in rows)
391 self.assertEqual(len(unique_ids), 3)
392 self.assertIn('doi:10.INVALID/123', unique_ids)
393 self.assertIn('doi:10.INVALID/456', unique_ids)
394 self.assertIn('doi:10.INVALID/789', unique_ids)
396 @patch('oc_meta.run.meta.preprocess_input.check_ids_existence_sparql')
397 def test_cross_file_deduplication_sparql(self, mock_check):
398 """Test that duplicate rows are filtered across different files using SPARQL"""
399 # Mock the check_ids_existence_sparql function
400 def side_effect(ids, endpoint):
401 if not ids:
402 return False
404 id_list = ids.split()
406 # We'll test both the scheme and value for each ID
407 for id_str in id_list:
408 parts = id_str.split(":", 1)
409 scheme = parts[0]
410 value = parts[1]
412 # Make sure both scheme and value are extracted correctly
413 if scheme != "doi" or not value.startswith("10."):
414 continue
416 # Check if the full ID is in our list of valid IDs
417 if id_str not in self.existing_dois_in_sparql:
418 return False
420 return True
422 mock_check.side_effect = side_effect
424 # Create first file with some data
425 file1_path = os.path.join(self.test_dir, 'data1_sparql.csv')
426 file1_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
427"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
428"doi:10.INVALID/456","Different Title","Other Author","2024","Test Venue","1","1","11-20","journal article","Test Publisher",'''
430 # Create second file with some duplicates from first file
431 file2_path = os.path.join(self.test_dir, 'data2_sparql.csv')
432 file2_data = '''id,title,author,pub_date,venue,volume,issue,page,type,publisher,editor
433"doi:10.INVALID/123","Test Title","Test Author","2024","Test Venue","1","1","1-10","journal article","Test Publisher",
434"doi:10.INVALID/789","New Title","New Author","2024","Test Venue","1","1","21-30","journal article","Test Publisher",'''
436 with open(file1_path, 'w', encoding='utf-8') as f:
437 f.write(file1_data)
438 with open(file2_path, 'w', encoding='utf-8') as f:
439 f.write(file2_data)
441 # Process both files using the same seen_rows set and pending_rows list
442 seen_rows = set()
443 pending_rows = []
444 next_file_num, stats1, pending_rows = process_csv_file(
445 file1_path,
446 self.output_dir,
447 0,
448 storage_type='sparql',
449 storage_reference=self.sparql_endpoint,
450 seen_rows=seen_rows,
451 pending_rows=pending_rows
452 )
453 next_file_num, stats2, pending_rows = process_csv_file(
454 file2_path,
455 self.output_dir,
456 next_file_num,
457 storage_type='sparql',
458 storage_reference=self.sparql_endpoint,
459 seen_rows=seen_rows,
460 pending_rows=pending_rows
461 )
463 # Write final pending rows
464 if pending_rows:
465 output_file = os.path.join(self.output_dir, f"{next_file_num}.csv")
466 with open(output_file, 'w', encoding='utf-8', newline='') as out_f:
467 writer = csv.DictWriter(out_f, fieldnames=pending_rows[0].keys())
468 writer.writeheader()
469 writer.writerows(pending_rows)
471 # Verify statistics
472 self.assertEqual(stats1.total_rows, 2)
473 self.assertEqual(stats1.duplicate_rows, 0)
474 self.assertEqual(stats1.processed_rows, 2)
476 self.assertEqual(stats2.total_rows, 2)
477 self.assertEqual(stats2.duplicate_rows, 1) # One row should be detected as duplicate
478 self.assertEqual(stats2.processed_rows, 1) # Only one new row should be processed
480 # Check output files
481 output_files = sorted(os.listdir(self.output_dir))
482 self.assertEqual(len(output_files), 1) # Should create only one file
484 # Verify final output contains only unique rows
485 output_file = os.path.join(self.output_dir, '0.csv')
486 with open(output_file, 'r', encoding='utf-8') as f:
487 reader = csv.DictReader(f)
488 rows = list(reader)
489 self.assertEqual(len(rows), 3) # Should have 3 unique rows total
490 unique_ids = set(row['id'] for row in rows)
491 self.assertEqual(len(unique_ids), 3)
492 self.assertIn('doi:10.INVALID/123', unique_ids)
493 self.assertIn('doi:10.INVALID/456', unique_ids)
494 self.assertIn('doi:10.INVALID/789', unique_ids)
496if __name__ == '__main__':
497 unittest.main()