Coverage for test / test_meta_runner.py: 100%
371 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 14:31 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 14:31 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import csv
6import os
7import shutil
8import time
9import unittest
10from unittest.mock import Mock, patch
12import requests
13from crowdsourcing.meta_runner import (
14 check_triplestore_connection,
15 get_closed_issues,
16 get_ingestion_dirs,
17 process_meta_issues,
18 process_single_issue,
19 store_meta_input,
20 update_issue_labels,
21)
24class TestMetaRunner(unittest.TestCase):
25 def setUp(self):
26 # Create a temporary environment variable for testing
27 os.environ["GH_TOKEN"] = "test_token"
28 os.environ["GITHUB_REPOSITORY"] = "test/repo"
30 def tearDown(self):
31 # Clean up any created directories
32 if os.path.exists("crowdsourcing_ingestion_data"):
33 shutil.rmtree("crowdsourcing_ingestion_data")
34 # Remove environment variables
35 del os.environ["GH_TOKEN"]
36 del os.environ["GITHUB_REPOSITORY"]
38 @patch("time.strftime")
39 def test_get_ingestion_dirs(self, mock_strftime):
40 # Mock the current date
41 mock_strftime.return_value = "2024_03"
43 # Call the function
44 base_dir, metadata_dir, citations_dir = get_ingestion_dirs()
46 # Check the returned paths
47 self.assertEqual(
48 base_dir, os.path.join("crowdsourcing_ingestion_data", "2024_03")
49 )
50 self.assertEqual(metadata_dir, os.path.join(base_dir, "metadata"))
51 self.assertEqual(citations_dir, os.path.join(base_dir, "citations"))
53 # Check that directories were created
54 self.assertTrue(os.path.exists(metadata_dir))
55 self.assertTrue(os.path.exists(citations_dir))
57 @patch("requests.get")
58 def test_get_closed_issues_success(self, mock_get):
59 # Mock successful response
60 mock_response = Mock()
61 mock_response.status_code = 200
62 mock_response.json.return_value = [
63 {
64 "body": "test body 1",
65 "number": 1,
66 "user": {
67 "login": "test-user-1",
68 "html_url": "https://github.com/test-user-1",
69 "id": 12345,
70 },
71 },
72 {
73 "body": "test body 2",
74 "number": 2,
75 "user": {
76 "login": "test-user-2",
77 "html_url": "https://github.com/test-user-2",
78 "id": 67890,
79 },
80 },
81 ]
82 mock_get.return_value = mock_response
84 # Call the function
85 issues = get_closed_issues()
87 # Verify the results
88 self.assertEqual(len(issues), 2)
89 self.assertEqual(issues[0]["body"], "test body 1")
90 self.assertEqual(issues[0]["number"], "1")
91 self.assertEqual(issues[0]["user"]["login"], "test-user-1")
92 self.assertEqual(issues[0]["user"]["id"], 12345)
94 # Verify the API call
95 mock_get.assert_called_once_with(
96 "https://api.github.com/repos/test/repo/issues",
97 params={"state": "closed", "labels": "to be processed"},
98 headers={
99 "Accept": "application/vnd.github+json",
100 "Authorization": "Bearer test_token",
101 },
102 timeout=30,
103 )
105 @patch("requests.get")
106 def test_get_closed_issues_404(self, mock_get):
107 # Mock 404 response
108 mock_response = Mock()
109 mock_response.status_code = 404
110 mock_get.return_value = mock_response
112 # Call the function
113 issues = get_closed_issues()
115 # Verify empty result
116 self.assertEqual(issues, [])
118 @patch("requests.get")
119 def test_get_closed_issues_rate_limit(self, mock_get):
120 # Mock rate limit response
121 mock_response = Mock()
122 mock_response.status_code = 403
123 mock_response.headers = {
124 "X-RateLimit-Remaining": "0",
125 "X-RateLimit-Reset": str(int(time.time()) + 1),
126 }
127 mock_get.return_value = mock_response
129 # Call the function
130 issues = get_closed_issues()
132 # Verify empty result and multiple attempts
133 self.assertEqual(issues, [])
134 self.assertEqual(mock_get.call_count, 3) # MAX_RETRIES
136 @patch("requests.get")
137 def test_get_closed_issues_unexpected_status(self, mock_get):
138 """Test handling of unexpected HTTP status codes."""
139 # Mock response with unexpected status code
140 mock_response = Mock()
141 mock_response.status_code = 500
142 mock_response.text = "Internal Server Error"
143 mock_get.return_value = mock_response
145 # Call the function
146 issues = get_closed_issues()
148 # Verify empty result after all retries
149 self.assertEqual(issues, [])
150 self.assertEqual(mock_get.call_count, 3) # Should retry MAX_RETRIES times
152 @patch("requests.get")
153 @patch("time.sleep") # Mock sleep to speed up test
154 def test_get_closed_issues_request_exception(self, mock_sleep, mock_get):
155 """Test handling of request exceptions with retries."""
156 # Mock get to raise exception
157 mock_get.side_effect = requests.RequestException("Connection error")
159 # Verify exception is raised after all retries
160 with self.assertRaises(RuntimeError) as context:
161 get_closed_issues()
163 self.assertEqual(
164 str(context.exception), "Failed to fetch issues after 3 attempts"
165 )
166 self.assertEqual(mock_get.call_count, 3) # Should retry MAX_RETRIES times
167 self.assertEqual(
168 mock_sleep.call_count, 2
169 ) # Should sleep RETRY_DELAY times (attempts - 1)
170 mock_sleep.assert_called_with(5) # Verify sleep duration
172 @patch("requests.get")
173 def test_get_closed_issues_key_error(self, mock_get):
174 """Test handling of KeyError in response parsing."""
175 # Mock response with missing required key
176 mock_response = Mock()
177 mock_response.status_code = 200
178 mock_response.json.return_value = [{"number": 1}] # Missing 'body' key
179 mock_get.return_value = mock_response
181 # Verify exception is raised after all retries
182 with self.assertRaises(RuntimeError) as context:
183 get_closed_issues()
185 self.assertEqual(
186 str(context.exception), "Failed to fetch issues after 3 attempts"
187 )
188 self.assertEqual(mock_get.call_count, 3) # Should retry MAX_RETRIES times
190 def test_store_meta_input_success(self):
191 # Create test data
192 issues = [
193 {
194 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n123,456",
195 "number": "1",
196 }
197 ]
199 # Call the function
200 store_meta_input(issues)
202 # Get the created directories
203 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
204 metadata_file = os.path.join(base_dir, "metadata", "0.csv")
205 citations_file = os.path.join(base_dir, "citations", "0.csv")
207 # Check that files were created
208 self.assertTrue(os.path.exists(metadata_file))
209 self.assertTrue(os.path.exists(citations_file))
211 # Verify metadata content
212 with open(metadata_file, "r") as f:
213 metadata = list(csv.DictReader(f))
214 self.assertEqual(len(metadata), 1)
215 self.assertEqual(metadata[0]["id"], "1")
216 self.assertEqual(metadata[0]["title"], "Test Title")
218 # Verify citations content
219 with open(citations_file, "r") as f:
220 citations = list(csv.DictReader(f))
221 self.assertEqual(len(citations), 1)
222 self.assertEqual(citations[0]["citing"], "123")
223 self.assertEqual(citations[0]["cited"], "456")
225 def test_store_meta_input_invalid_separator(self):
226 # Create test data with invalid separator
227 issues = [
228 {
229 "body": "id,title\n1,Test Title\nInvalid Separator\nciting,cited\n123,456",
230 "number": "1",
231 }
232 ]
234 # Call the function
235 store_meta_input(issues)
237 # Check that no files were created
238 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
239 metadata_dir = os.path.join(base_dir, "metadata")
240 citations_dir = os.path.join(base_dir, "citations")
242 self.assertTrue(os.path.exists(metadata_dir)) # Directories should exist
243 self.assertTrue(os.path.exists(citations_dir))
244 self.assertEqual(len(os.listdir(metadata_dir)), 0) # But should be empty
245 self.assertEqual(len(os.listdir(citations_dir)), 0)
247 def test_store_meta_input_empty_sections(self):
248 # Create test data with empty sections
249 issues = [{"body": "\n===###===@@@===\n", "number": "1"}]
251 # Call the function
252 store_meta_input(issues)
254 # Check that no files were created
255 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
256 metadata_dir = os.path.join(base_dir, "metadata")
257 citations_dir = os.path.join(base_dir, "citations")
259 self.assertTrue(os.path.exists(metadata_dir))
260 self.assertTrue(os.path.exists(citations_dir))
261 self.assertEqual(len(os.listdir(metadata_dir)), 0)
262 self.assertEqual(len(os.listdir(citations_dir)), 0)
264 def test_store_meta_input_empty_citations_section(self):
265 # Test with empty citations section
266 issues = [{"body": "id,title\n1,Test Title\n===###===@@@===\n", "number": "1"}]
268 # Call the function
269 store_meta_input(issues)
271 # Check that no files were created since we should have continued
272 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
273 metadata_dir = os.path.join(base_dir, "metadata")
274 citations_dir = os.path.join(base_dir, "citations")
275 self.assertEqual(len(os.listdir(metadata_dir)), 0)
276 self.assertEqual(len(os.listdir(citations_dir)), 0)
278 def test_store_meta_input_empty_metadata_records(self):
279 # Test with empty metadata records after parsing
280 issues = [
281 {"body": "id,title\n===###===@@@===citing,cited\n123,456", "number": "1"}
282 ]
284 # Call the function
285 store_meta_input(issues)
287 # Check that no files were created since we should have continued
288 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
289 metadata_dir = os.path.join(base_dir, "metadata")
290 citations_dir = os.path.join(base_dir, "citations")
291 self.assertEqual(len(os.listdir(metadata_dir)), 0)
292 self.assertEqual(len(os.listdir(citations_dir)), 0)
294 def test_store_meta_input_empty_citation_records(self):
295 # Test with empty citation records after parsing
296 issues = [
297 {
298 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n",
299 "number": "1",
300 }
301 ]
303 # Call the function
304 store_meta_input(issues)
306 # Check that no files were created since we should have continued
307 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
308 metadata_dir = os.path.join(base_dir, "metadata")
309 citations_dir = os.path.join(base_dir, "citations")
310 self.assertEqual(len(os.listdir(metadata_dir)), 0)
311 self.assertEqual(len(os.listdir(citations_dir)), 0)
313 def test_store_meta_input_key_error(self):
314 # Test with missing required key that will raise KeyError
315 issues = [{"wrong_key": "value"}] # Missing 'body' key
317 # Call the function
318 store_meta_input(issues)
320 # Check that no files were created due to KeyError
321 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
322 metadata_dir = os.path.join(base_dir, "metadata")
323 citations_dir = os.path.join(base_dir, "citations")
324 self.assertEqual(len(os.listdir(metadata_dir)), 0)
325 self.assertEqual(len(os.listdir(citations_dir)), 0)
327 def test_store_meta_input_thousand_record_limit(self):
328 # Create test data with more than 1000 records
329 metadata_rows = ["1,Test Title"] * 1200 # 1200 identical rows
330 citation_rows = ["123,456"] * 1200 # 1200 identical rows
332 issues = [
333 {
334 "body": f"id,title\n{chr(10).join(metadata_rows)}\n===###===@@@===citing,cited\n{chr(10).join(citation_rows)}",
335 "number": "1",
336 }
337 ]
339 # Call the function
340 store_meta_input(issues)
342 # Get the created directories
343 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
344 metadata_dir = os.path.join(base_dir, "metadata")
345 citations_dir = os.path.join(base_dir, "citations")
347 # Should have 2 files in each directory (1000 records in first file, 200 in second)
348 self.assertEqual(len(os.listdir(metadata_dir)), 2)
349 self.assertEqual(len(os.listdir(citations_dir)), 2)
351 # Verify content of first metadata file (should have 1000 records)
352 with open(os.path.join(metadata_dir, "0.csv"), "r") as f:
353 metadata = list(csv.DictReader(f))
354 self.assertEqual(len(metadata), 1000)
356 # Verify content of second metadata file (should have 200 records)
357 with open(os.path.join(metadata_dir, "1.csv"), "r") as f:
358 metadata = list(csv.DictReader(f))
359 self.assertEqual(len(metadata), 200)
361 # Verify content of first citations file (should have 1000 records)
362 with open(os.path.join(citations_dir, "0.csv"), "r") as f:
363 citations = list(csv.DictReader(f))
364 self.assertEqual(len(citations), 1000)
366 # Verify content of second citations file (should have 200 records)
367 with open(os.path.join(citations_dir, "1.csv"), "r") as f:
368 citations = list(csv.DictReader(f))
369 self.assertEqual(len(citations), 200)
371 @patch("SPARQLWrapper.SPARQLWrapper.query")
372 def test_check_triplestore_connection_success(self, mock_query):
373 # Test successful connection
374 result = check_triplestore_connection("http://example.com/sparql")
375 self.assertTrue(result)
376 mock_query.assert_called_once()
378 @patch("SPARQLWrapper.SPARQLWrapper.query")
379 def test_check_triplestore_connection_failure(self, mock_query):
380 # Test failed connection
381 mock_query.side_effect = Exception("Connection failed")
382 result = check_triplestore_connection("http://example.com/sparql")
383 self.assertFalse(result)
384 mock_query.assert_called_once()
386 @patch("crowdsourcing.meta_runner.run_meta_process")
387 def test_process_single_issue_success(self, mock_run_meta):
388 # Prepare test data
389 issue = {
390 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n123,456",
391 "number": "1",
392 "user": {
393 "login": "test-user",
394 "html_url": "https://github.com/test-user",
395 "id": 12345,
396 },
397 }
398 base_settings = {
399 "triplestore_url": "http://example.com/sparql",
400 "base_output_dir": "/mnt/arcangelo/meta_output_current",
401 }
403 # Run the function
404 success = process_single_issue(issue, base_settings)
406 # Verify results
407 self.assertTrue(success)
408 mock_run_meta.assert_called_once()
410 # Check that temporary config was created and then deleted
411 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
412 temp_config_path = os.path.join(base_dir, "meta_config_1.yaml")
413 self.assertFalse(os.path.exists(temp_config_path))
415 # Verify metadata and citations were stored
416 metadata_dir = os.path.join(base_dir, "metadata")
417 citations_dir = os.path.join(base_dir, "citations")
418 self.assertTrue(os.path.exists(metadata_dir))
419 self.assertTrue(os.path.exists(citations_dir))
420 self.assertEqual(len(os.listdir(metadata_dir)), 1)
421 self.assertEqual(len(os.listdir(citations_dir)), 1)
423 @patch("crowdsourcing.meta_runner.run_meta_process")
424 def test_process_single_issue_meta_process_failure(self, mock_run_meta):
425 # Prepare test data
426 issue = {
427 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n123,456",
428 "number": "1",
429 "user": {
430 "login": "test-user",
431 "html_url": "https://github.com/test-user",
432 "id": 12345,
433 },
434 }
435 base_settings = {
436 "triplestore_url": "http://example.com/sparql",
437 "base_output_dir": "/mnt/arcangelo/meta_output_current",
438 }
440 # Make meta_process raise an exception
441 mock_run_meta.side_effect = Exception("Meta process failed")
443 # Run the function
444 success = process_single_issue(issue, base_settings)
446 # Verify results
447 self.assertFalse(success)
448 mock_run_meta.assert_called_once()
450 # Check that temporary config was cleaned up
451 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
452 temp_config_path = os.path.join(base_dir, "meta_config_1.yaml")
453 self.assertFalse(os.path.exists(temp_config_path))
455 def test_process_single_issue_invalid_issue(self):
456 # Test with invalid issue data
457 issue = {
458 "body": "Invalid issue body without separator",
459 "number": "1",
460 "user": {
461 "login": "test-user",
462 "html_url": "https://github.com/test-user",
463 "id": 12345,
464 },
465 }
466 base_settings = {
467 "triplestore_url": "http://example.com/sparql",
468 "base_output_dir": "/mnt/arcangelo/meta_output_current",
469 }
471 # Run the function
472 success = process_single_issue(issue, base_settings)
474 # Verify results
475 self.assertFalse(success)
477 # Check that no files were created
478 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
479 metadata_dir = os.path.join(base_dir, "metadata")
480 citations_dir = os.path.join(base_dir, "citations")
481 self.assertTrue(os.path.exists(metadata_dir))
482 self.assertTrue(os.path.exists(citations_dir))
483 self.assertEqual(len(os.listdir(metadata_dir)), 0)
484 self.assertEqual(len(os.listdir(citations_dir)), 0)
486 @patch("crowdsourcing.meta_runner.run_meta_process")
487 def test_process_single_issue_settings_update(self, mock_run_meta):
488 # Prepare test data
489 issue = {
490 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n123,456",
491 "number": "1",
492 "user": {
493 "login": "test-user",
494 "html_url": "https://github.com/test-user",
495 "id": 12345,
496 },
497 }
498 base_settings = {
499 "triplestore_url": "http://example.com/sparql",
500 "base_output_dir": "/mnt/arcangelo/meta_output_current",
501 "some_other_setting": "value",
502 }
504 # Run the function
505 success = process_single_issue(issue, base_settings)
507 # Verify results
508 self.assertTrue(success)
510 # Check that run_meta_process was called with correct settings
511 called_settings = mock_run_meta.call_args[1]["settings"]
512 self.assertEqual(
513 called_settings["source"],
514 f"https://github.com/{os.environ['GITHUB_REPOSITORY']}/issues/1",
515 )
516 self.assertEqual(
517 called_settings["resp_agent"], "https://api.github.com/user/12345"
518 )
519 self.assertEqual(called_settings["some_other_setting"], "value")
520 self.assertTrue(called_settings["input_csv_dir"].endswith("metadata"))
522 @patch("crowdsourcing.meta_runner.run_meta_process")
523 def test_process_single_issue_general_exception(self, mock_run_meta):
524 """Test handling of general exceptions in process_single_issue."""
525 # Prepare test data
526 issue = {
527 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n123,456",
528 "number": "1",
529 "user": {
530 "login": "test-user",
531 "html_url": "https://github.com/test-user",
532 "id": 12345,
533 },
534 }
535 base_settings = {
536 "triplestore_url": "http://example.com/sparql",
537 "base_output_dir": "/test/output",
538 }
540 # Make run_meta_process raise a general exception
541 mock_run_meta.side_effect = Exception("Unexpected error")
543 # Run the function
544 success = process_single_issue(issue, base_settings)
546 # Verify results
547 self.assertFalse(success)
548 mock_run_meta.assert_called_once()
550 # Check that temporary config was cleaned up
551 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m"))
552 temp_config_path = os.path.join(base_dir, "meta_config_1.yaml")
553 self.assertFalse(os.path.exists(temp_config_path))
556class TestUpdateIssueLabels(unittest.TestCase):
557 """Test the update_issue_labels function."""
559 def setUp(self):
560 """Set up test environment before each test."""
561 self.issue_number = "123"
562 self.env_patcher = patch.dict(
563 "os.environ",
564 {
565 "GH_TOKEN": "fake-token",
566 "GITHUB_REPOSITORY": "test/repo",
567 },
568 )
569 self.env_patcher.start()
570 self.base_url = (
571 f"https://api.github.com/repos/test/repo/issues/{self.issue_number}"
572 )
573 self.headers = {
574 "Accept": "application/vnd.github+json",
575 "Authorization": "Bearer fake-token",
576 "X-GitHub-Api-Version": "2022-11-28",
577 }
579 def tearDown(self):
580 """Clean up after each test."""
581 self.env_patcher.stop()
583 @patch("requests.delete")
584 @patch("requests.post")
585 def test_successful_update_on_success(self, mock_post, mock_delete):
586 """Test successful label update when processing succeeds."""
587 # Setup mocks
588 mock_delete.return_value.status_code = 200
589 mock_post.return_value.status_code = 201
591 # Call function
592 update_issue_labels(self.issue_number, success=True)
594 # Verify delete call to remove 'to be processed'
595 mock_delete.assert_called_once_with(
596 f"{self.base_url}/labels/to%20be%20processed",
597 headers=self.headers,
598 timeout=30,
599 )
601 # Verify post call to add 'done' label
602 mock_post.assert_called_once_with(
603 f"{self.base_url}/labels",
604 headers=self.headers,
605 json={"labels": ["done"]},
606 timeout=30,
607 )
609 @patch("requests.delete")
610 @patch("requests.post")
611 def test_successful_update_on_failure(self, mock_post, mock_delete):
612 """Test successful label update when processing fails."""
613 # Setup mocks
614 mock_delete.return_value.status_code = 200
615 mock_post.return_value.status_code = 201
617 # Call function
618 update_issue_labels(self.issue_number, success=False)
620 # Verify delete call to remove 'to be processed'
621 mock_delete.assert_called_once_with(
622 f"{self.base_url}/labels/to%20be%20processed",
623 headers=self.headers,
624 timeout=30,
625 )
627 # Verify post call to add 'oc meta error' label
628 mock_post.assert_called_once_with(
629 f"{self.base_url}/labels",
630 headers=self.headers,
631 json={"labels": ["oc meta error"]},
632 timeout=30,
633 )
635 @patch("requests.delete")
636 def test_delete_label_error(self, mock_delete):
637 """Test error handling when removing label fails."""
638 # Setup mock to raise exception
639 mock_delete.side_effect = requests.RequestException("Network error")
641 # Verify exception is raised
642 with self.assertRaises(requests.RequestException) as context:
643 update_issue_labels(self.issue_number, success=True)
645 self.assertEqual(str(context.exception), "Network error")
646 mock_delete.assert_called_once()
648 @patch("requests.delete")
649 @patch("requests.post")
650 def test_add_label_error(self, mock_post, mock_delete):
651 """Test error handling when adding new label fails."""
652 # Setup mocks
653 mock_delete.return_value.status_code = 200
654 mock_post.side_effect = requests.RequestException("Network error")
656 # Verify exception is raised
657 with self.assertRaises(requests.RequestException) as context:
658 update_issue_labels(self.issue_number, success=True)
660 self.assertEqual(str(context.exception), "Network error")
661 mock_delete.assert_called_once()
662 mock_post.assert_called_once()
664 @patch("requests.delete")
665 @patch("requests.post")
666 @patch("crowdsourcing.meta_runner.logger.error")
667 def test_delete_label_non_200_response(
668 self, mock_log_error, mock_post, mock_delete
669 ):
670 """Test handling of non-200 response when deleting label."""
671 # Setup mocks
672 mock_delete.return_value.status_code = 404
673 mock_delete.return_value.text = "Label not found"
674 mock_post.return_value.status_code = 201
676 # Call function
677 update_issue_labels(self.issue_number, success=True)
679 # Verify error was logged
680 mock_log_error.assert_called_with(
681 "Error response from delete label: Label not found"
682 )
684 # Verify post was still called
685 mock_post.assert_called_once()
687 @patch("requests.delete")
688 @patch("requests.post")
689 @patch("crowdsourcing.meta_runner.logger.error")
690 def test_add_label_non_200_response(self, mock_log_error, mock_post, mock_delete):
691 """Test handling of non-200/201 response when adding label."""
692 # Setup mocks
693 mock_delete.return_value.status_code = 200
694 mock_post.return_value.status_code = 422
695 mock_post.return_value.text = "Validation failed"
697 # Call function
698 update_issue_labels(self.issue_number, success=True)
700 # Verify error was logged
701 mock_log_error.assert_called_with("Error adding label: Validation failed")
704class TestProcessMetaIssues(unittest.TestCase):
705 """Test the main process_meta_issues function."""
707 def setUp(self):
708 """Set up test environment."""
709 self.env_patcher = patch.dict(
710 "os.environ",
711 {
712 "GH_TOKEN": "fake-token",
713 "GITHUB_REPOSITORY": "test/repo",
714 },
715 )
716 self.env_patcher.start()
718 # Setup test configuration
719 self.config_content = {
720 "triplestore_url": "http://example.com/sparql",
721 "base_output_dir": "/test/output",
722 }
724 # Patch yaml.safe_load to return our test config
725 self.yaml_patcher = patch("yaml.safe_load")
726 self.mock_yaml_load = self.yaml_patcher.start()
727 self.mock_yaml_load.return_value = self.config_content
729 def tearDown(self):
730 """Clean up after each test."""
731 self.env_patcher.stop()
732 self.yaml_patcher.stop()
734 @patch("crowdsourcing.meta_runner.check_triplestore_connection")
735 @patch("crowdsourcing.meta_runner.get_closed_issues")
736 @patch("crowdsourcing.meta_runner.process_single_issue")
737 @patch("crowdsourcing.meta_runner.update_issue_labels")
738 def test_successful_processing(
739 self, mock_update_labels, mock_process_issue, mock_get_issues, mock_check_conn
740 ):
741 """Test successful processing of multiple issues."""
742 # Setup mocks
743 mock_check_conn.return_value = True
744 mock_get_issues.return_value = [
745 {"body": "test1", "number": "1"},
746 {"body": "test2", "number": "2"},
747 ]
748 mock_process_issue.return_value = True
750 # Run function
751 process_meta_issues()
753 # Verify all issues were processed
754 self.assertEqual(mock_process_issue.call_count, 2)
755 self.assertEqual(mock_update_labels.call_count, 2)
757 # Verify correct parameters were passed
758 mock_process_issue.assert_any_call(
759 {"body": "test1", "number": "1"}, self.config_content
760 )
761 mock_update_labels.assert_any_call("1", True)
762 mock_process_issue.assert_any_call(
763 {"body": "test2", "number": "2"}, self.config_content
764 )
765 mock_update_labels.assert_any_call("2", True)
767 @patch("crowdsourcing.meta_runner.check_triplestore_connection")
768 def test_triplestore_not_responsive(self, mock_check_conn):
769 """Test behavior when triplestore is not responsive."""
770 mock_check_conn.return_value = False
772 # Run function
773 process_meta_issues()
775 # Verify early return
776 mock_check_conn.assert_called_once_with(self.config_content["triplestore_url"])
778 @patch("crowdsourcing.meta_runner.check_triplestore_connection")
779 @patch("crowdsourcing.meta_runner.get_closed_issues")
780 def test_no_issues_to_process(self, mock_get_issues, mock_check_conn):
781 """Test behavior when no issues are found."""
782 mock_check_conn.return_value = True
783 mock_get_issues.return_value = []
785 # Run function
786 process_meta_issues()
788 # Verify early return after finding no issues
789 mock_get_issues.assert_called_once()
791 @patch("crowdsourcing.meta_runner.check_triplestore_connection")
792 @patch("crowdsourcing.meta_runner.get_closed_issues")
793 @patch("crowdsourcing.meta_runner.process_single_issue")
794 @patch("crowdsourcing.meta_runner.update_issue_labels")
795 def test_mixed_processing_results(
796 self, mock_update_labels, mock_process_issue, mock_get_issues, mock_check_conn
797 ):
798 """Test processing with both successful and failed issues."""
799 # Setup mocks
800 mock_check_conn.return_value = True
801 mock_get_issues.return_value = [
802 {"body": "test1", "number": "1"},
803 {"body": "test2", "number": "2"},
804 ]
805 # First issue succeeds, second fails
806 mock_process_issue.side_effect = [True, False]
808 # Run function
809 process_meta_issues()
811 # Verify all issues were processed
812 self.assertEqual(mock_process_issue.call_count, 2)
813 self.assertEqual(mock_update_labels.call_count, 2)
815 # Verify correct labels were set
816 mock_update_labels.assert_any_call("1", True)
817 mock_update_labels.assert_any_call("2", False)
819 @patch("crowdsourcing.meta_runner.check_triplestore_connection")
820 @patch("crowdsourcing.meta_runner.get_closed_issues")
821 @patch("crowdsourcing.meta_runner.process_single_issue")
822 @patch("crowdsourcing.meta_runner.update_issue_labels")
823 def test_error_handling(
824 self, mock_update_labels, mock_process_issue, mock_get_issues, mock_check_conn
825 ):
826 """Test error handling during processing."""
827 # Setup mocks
828 mock_check_conn.return_value = True
829 mock_get_issues.return_value = [{"body": "test1", "number": "1"}]
830 mock_process_issue.side_effect = Exception("Processing error")
832 # Verify exception is propagated
833 with self.assertRaises(Exception) as context:
834 process_meta_issues()
836 self.assertEqual(str(context.exception), "Processing error")
839if __name__ == "__main__": # pragma: no cover
840 unittest.main()