Coverage for test / test_meta_runner.py: 100%

371 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-21 14:31 +0000

1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import csv 

6import os 

7import shutil 

8import time 

9import unittest 

10from unittest.mock import Mock, patch 

11 

12import requests 

13from crowdsourcing.meta_runner import ( 

14 check_triplestore_connection, 

15 get_closed_issues, 

16 get_ingestion_dirs, 

17 process_meta_issues, 

18 process_single_issue, 

19 store_meta_input, 

20 update_issue_labels, 

21) 

22 

23 

24class TestMetaRunner(unittest.TestCase): 

25 def setUp(self): 

26 # Create a temporary environment variable for testing 

27 os.environ["GH_TOKEN"] = "test_token" 

28 os.environ["GITHUB_REPOSITORY"] = "test/repo" 

29 

30 def tearDown(self): 

31 # Clean up any created directories 

32 if os.path.exists("crowdsourcing_ingestion_data"): 

33 shutil.rmtree("crowdsourcing_ingestion_data") 

34 # Remove environment variables 

35 del os.environ["GH_TOKEN"] 

36 del os.environ["GITHUB_REPOSITORY"] 

37 

38 @patch("time.strftime") 

39 def test_get_ingestion_dirs(self, mock_strftime): 

40 # Mock the current date 

41 mock_strftime.return_value = "2024_03" 

42 

43 # Call the function 

44 base_dir, metadata_dir, citations_dir = get_ingestion_dirs() 

45 

46 # Check the returned paths 

47 self.assertEqual( 

48 base_dir, os.path.join("crowdsourcing_ingestion_data", "2024_03") 

49 ) 

50 self.assertEqual(metadata_dir, os.path.join(base_dir, "metadata")) 

51 self.assertEqual(citations_dir, os.path.join(base_dir, "citations")) 

52 

53 # Check that directories were created 

54 self.assertTrue(os.path.exists(metadata_dir)) 

55 self.assertTrue(os.path.exists(citations_dir)) 

56 

57 @patch("requests.get") 

58 def test_get_closed_issues_success(self, mock_get): 

59 # Mock successful response 

60 mock_response = Mock() 

61 mock_response.status_code = 200 

62 mock_response.json.return_value = [ 

63 { 

64 "body": "test body 1", 

65 "number": 1, 

66 "user": { 

67 "login": "test-user-1", 

68 "html_url": "https://github.com/test-user-1", 

69 "id": 12345, 

70 }, 

71 }, 

72 { 

73 "body": "test body 2", 

74 "number": 2, 

75 "user": { 

76 "login": "test-user-2", 

77 "html_url": "https://github.com/test-user-2", 

78 "id": 67890, 

79 }, 

80 }, 

81 ] 

82 mock_get.return_value = mock_response 

83 

84 # Call the function 

85 issues = get_closed_issues() 

86 

87 # Verify the results 

88 self.assertEqual(len(issues), 2) 

89 self.assertEqual(issues[0]["body"], "test body 1") 

90 self.assertEqual(issues[0]["number"], "1") 

91 self.assertEqual(issues[0]["user"]["login"], "test-user-1") 

92 self.assertEqual(issues[0]["user"]["id"], 12345) 

93 

94 # Verify the API call 

95 mock_get.assert_called_once_with( 

96 "https://api.github.com/repos/test/repo/issues", 

97 params={"state": "closed", "labels": "to be processed"}, 

98 headers={ 

99 "Accept": "application/vnd.github+json", 

100 "Authorization": "Bearer test_token", 

101 }, 

102 timeout=30, 

103 ) 

104 

105 @patch("requests.get") 

106 def test_get_closed_issues_404(self, mock_get): 

107 # Mock 404 response 

108 mock_response = Mock() 

109 mock_response.status_code = 404 

110 mock_get.return_value = mock_response 

111 

112 # Call the function 

113 issues = get_closed_issues() 

114 

115 # Verify empty result 

116 self.assertEqual(issues, []) 

117 

118 @patch("requests.get") 

119 def test_get_closed_issues_rate_limit(self, mock_get): 

120 # Mock rate limit response 

121 mock_response = Mock() 

122 mock_response.status_code = 403 

123 mock_response.headers = { 

124 "X-RateLimit-Remaining": "0", 

125 "X-RateLimit-Reset": str(int(time.time()) + 1), 

126 } 

127 mock_get.return_value = mock_response 

128 

129 # Call the function 

130 issues = get_closed_issues() 

131 

132 # Verify empty result and multiple attempts 

133 self.assertEqual(issues, []) 

134 self.assertEqual(mock_get.call_count, 3) # MAX_RETRIES 

135 

136 @patch("requests.get") 

137 def test_get_closed_issues_unexpected_status(self, mock_get): 

138 """Test handling of unexpected HTTP status codes.""" 

139 # Mock response with unexpected status code 

140 mock_response = Mock() 

141 mock_response.status_code = 500 

142 mock_response.text = "Internal Server Error" 

143 mock_get.return_value = mock_response 

144 

145 # Call the function 

146 issues = get_closed_issues() 

147 

148 # Verify empty result after all retries 

149 self.assertEqual(issues, []) 

150 self.assertEqual(mock_get.call_count, 3) # Should retry MAX_RETRIES times 

151 

152 @patch("requests.get") 

153 @patch("time.sleep") # Mock sleep to speed up test 

154 def test_get_closed_issues_request_exception(self, mock_sleep, mock_get): 

155 """Test handling of request exceptions with retries.""" 

156 # Mock get to raise exception 

157 mock_get.side_effect = requests.RequestException("Connection error") 

158 

159 # Verify exception is raised after all retries 

160 with self.assertRaises(RuntimeError) as context: 

161 get_closed_issues() 

162 

163 self.assertEqual( 

164 str(context.exception), "Failed to fetch issues after 3 attempts" 

165 ) 

166 self.assertEqual(mock_get.call_count, 3) # Should retry MAX_RETRIES times 

167 self.assertEqual( 

168 mock_sleep.call_count, 2 

169 ) # Should sleep RETRY_DELAY times (attempts - 1) 

170 mock_sleep.assert_called_with(5) # Verify sleep duration 

171 

172 @patch("requests.get") 

173 def test_get_closed_issues_key_error(self, mock_get): 

174 """Test handling of KeyError in response parsing.""" 

175 # Mock response with missing required key 

176 mock_response = Mock() 

177 mock_response.status_code = 200 

178 mock_response.json.return_value = [{"number": 1}] # Missing 'body' key 

179 mock_get.return_value = mock_response 

180 

181 # Verify exception is raised after all retries 

182 with self.assertRaises(RuntimeError) as context: 

183 get_closed_issues() 

184 

185 self.assertEqual( 

186 str(context.exception), "Failed to fetch issues after 3 attempts" 

187 ) 

188 self.assertEqual(mock_get.call_count, 3) # Should retry MAX_RETRIES times 

189 

190 def test_store_meta_input_success(self): 

191 # Create test data 

192 issues = [ 

193 { 

194 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n123,456", 

195 "number": "1", 

196 } 

197 ] 

198 

199 # Call the function 

200 store_meta_input(issues) 

201 

202 # Get the created directories 

203 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

204 metadata_file = os.path.join(base_dir, "metadata", "0.csv") 

205 citations_file = os.path.join(base_dir, "citations", "0.csv") 

206 

207 # Check that files were created 

208 self.assertTrue(os.path.exists(metadata_file)) 

209 self.assertTrue(os.path.exists(citations_file)) 

210 

211 # Verify metadata content 

212 with open(metadata_file, "r") as f: 

213 metadata = list(csv.DictReader(f)) 

214 self.assertEqual(len(metadata), 1) 

215 self.assertEqual(metadata[0]["id"], "1") 

216 self.assertEqual(metadata[0]["title"], "Test Title") 

217 

218 # Verify citations content 

219 with open(citations_file, "r") as f: 

220 citations = list(csv.DictReader(f)) 

221 self.assertEqual(len(citations), 1) 

222 self.assertEqual(citations[0]["citing"], "123") 

223 self.assertEqual(citations[0]["cited"], "456") 

224 

225 def test_store_meta_input_invalid_separator(self): 

226 # Create test data with invalid separator 

227 issues = [ 

228 { 

229 "body": "id,title\n1,Test Title\nInvalid Separator\nciting,cited\n123,456", 

230 "number": "1", 

231 } 

232 ] 

233 

234 # Call the function 

235 store_meta_input(issues) 

236 

237 # Check that no files were created 

238 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

239 metadata_dir = os.path.join(base_dir, "metadata") 

240 citations_dir = os.path.join(base_dir, "citations") 

241 

242 self.assertTrue(os.path.exists(metadata_dir)) # Directories should exist 

243 self.assertTrue(os.path.exists(citations_dir)) 

244 self.assertEqual(len(os.listdir(metadata_dir)), 0) # But should be empty 

245 self.assertEqual(len(os.listdir(citations_dir)), 0) 

246 

247 def test_store_meta_input_empty_sections(self): 

248 # Create test data with empty sections 

249 issues = [{"body": "\n===###===@@@===\n", "number": "1"}] 

250 

251 # Call the function 

252 store_meta_input(issues) 

253 

254 # Check that no files were created 

255 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

256 metadata_dir = os.path.join(base_dir, "metadata") 

257 citations_dir = os.path.join(base_dir, "citations") 

258 

259 self.assertTrue(os.path.exists(metadata_dir)) 

260 self.assertTrue(os.path.exists(citations_dir)) 

261 self.assertEqual(len(os.listdir(metadata_dir)), 0) 

262 self.assertEqual(len(os.listdir(citations_dir)), 0) 

263 

264 def test_store_meta_input_empty_citations_section(self): 

265 # Test with empty citations section 

266 issues = [{"body": "id,title\n1,Test Title\n===###===@@@===\n", "number": "1"}] 

267 

268 # Call the function 

269 store_meta_input(issues) 

270 

271 # Check that no files were created since we should have continued 

272 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

273 metadata_dir = os.path.join(base_dir, "metadata") 

274 citations_dir = os.path.join(base_dir, "citations") 

275 self.assertEqual(len(os.listdir(metadata_dir)), 0) 

276 self.assertEqual(len(os.listdir(citations_dir)), 0) 

277 

278 def test_store_meta_input_empty_metadata_records(self): 

279 # Test with empty metadata records after parsing 

280 issues = [ 

281 {"body": "id,title\n===###===@@@===citing,cited\n123,456", "number": "1"} 

282 ] 

283 

284 # Call the function 

285 store_meta_input(issues) 

286 

287 # Check that no files were created since we should have continued 

288 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

289 metadata_dir = os.path.join(base_dir, "metadata") 

290 citations_dir = os.path.join(base_dir, "citations") 

291 self.assertEqual(len(os.listdir(metadata_dir)), 0) 

292 self.assertEqual(len(os.listdir(citations_dir)), 0) 

293 

294 def test_store_meta_input_empty_citation_records(self): 

295 # Test with empty citation records after parsing 

296 issues = [ 

297 { 

298 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n", 

299 "number": "1", 

300 } 

301 ] 

302 

303 # Call the function 

304 store_meta_input(issues) 

305 

306 # Check that no files were created since we should have continued 

307 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

308 metadata_dir = os.path.join(base_dir, "metadata") 

309 citations_dir = os.path.join(base_dir, "citations") 

310 self.assertEqual(len(os.listdir(metadata_dir)), 0) 

311 self.assertEqual(len(os.listdir(citations_dir)), 0) 

312 

313 def test_store_meta_input_key_error(self): 

314 # Test with missing required key that will raise KeyError 

315 issues = [{"wrong_key": "value"}] # Missing 'body' key 

316 

317 # Call the function 

318 store_meta_input(issues) 

319 

320 # Check that no files were created due to KeyError 

321 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

322 metadata_dir = os.path.join(base_dir, "metadata") 

323 citations_dir = os.path.join(base_dir, "citations") 

324 self.assertEqual(len(os.listdir(metadata_dir)), 0) 

325 self.assertEqual(len(os.listdir(citations_dir)), 0) 

326 

327 def test_store_meta_input_thousand_record_limit(self): 

328 # Create test data with more than 1000 records 

329 metadata_rows = ["1,Test Title"] * 1200 # 1200 identical rows 

330 citation_rows = ["123,456"] * 1200 # 1200 identical rows 

331 

332 issues = [ 

333 { 

334 "body": f"id,title\n{chr(10).join(metadata_rows)}\n===###===@@@===citing,cited\n{chr(10).join(citation_rows)}", 

335 "number": "1", 

336 } 

337 ] 

338 

339 # Call the function 

340 store_meta_input(issues) 

341 

342 # Get the created directories 

343 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

344 metadata_dir = os.path.join(base_dir, "metadata") 

345 citations_dir = os.path.join(base_dir, "citations") 

346 

347 # Should have 2 files in each directory (1000 records in first file, 200 in second) 

348 self.assertEqual(len(os.listdir(metadata_dir)), 2) 

349 self.assertEqual(len(os.listdir(citations_dir)), 2) 

350 

351 # Verify content of first metadata file (should have 1000 records) 

352 with open(os.path.join(metadata_dir, "0.csv"), "r") as f: 

353 metadata = list(csv.DictReader(f)) 

354 self.assertEqual(len(metadata), 1000) 

355 

356 # Verify content of second metadata file (should have 200 records) 

357 with open(os.path.join(metadata_dir, "1.csv"), "r") as f: 

358 metadata = list(csv.DictReader(f)) 

359 self.assertEqual(len(metadata), 200) 

360 

361 # Verify content of first citations file (should have 1000 records) 

362 with open(os.path.join(citations_dir, "0.csv"), "r") as f: 

363 citations = list(csv.DictReader(f)) 

364 self.assertEqual(len(citations), 1000) 

365 

366 # Verify content of second citations file (should have 200 records) 

367 with open(os.path.join(citations_dir, "1.csv"), "r") as f: 

368 citations = list(csv.DictReader(f)) 

369 self.assertEqual(len(citations), 200) 

370 

371 @patch("SPARQLWrapper.SPARQLWrapper.query") 

372 def test_check_triplestore_connection_success(self, mock_query): 

373 # Test successful connection 

374 result = check_triplestore_connection("http://example.com/sparql") 

375 self.assertTrue(result) 

376 mock_query.assert_called_once() 

377 

378 @patch("SPARQLWrapper.SPARQLWrapper.query") 

379 def test_check_triplestore_connection_failure(self, mock_query): 

380 # Test failed connection 

381 mock_query.side_effect = Exception("Connection failed") 

382 result = check_triplestore_connection("http://example.com/sparql") 

383 self.assertFalse(result) 

384 mock_query.assert_called_once() 

385 

386 @patch("crowdsourcing.meta_runner.run_meta_process") 

387 def test_process_single_issue_success(self, mock_run_meta): 

388 # Prepare test data 

389 issue = { 

390 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n123,456", 

391 "number": "1", 

392 "user": { 

393 "login": "test-user", 

394 "html_url": "https://github.com/test-user", 

395 "id": 12345, 

396 }, 

397 } 

398 base_settings = { 

399 "triplestore_url": "http://example.com/sparql", 

400 "base_output_dir": "/mnt/arcangelo/meta_output_current", 

401 } 

402 

403 # Run the function 

404 success = process_single_issue(issue, base_settings) 

405 

406 # Verify results 

407 self.assertTrue(success) 

408 mock_run_meta.assert_called_once() 

409 

410 # Check that temporary config was created and then deleted 

411 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

412 temp_config_path = os.path.join(base_dir, "meta_config_1.yaml") 

413 self.assertFalse(os.path.exists(temp_config_path)) 

414 

415 # Verify metadata and citations were stored 

416 metadata_dir = os.path.join(base_dir, "metadata") 

417 citations_dir = os.path.join(base_dir, "citations") 

418 self.assertTrue(os.path.exists(metadata_dir)) 

419 self.assertTrue(os.path.exists(citations_dir)) 

420 self.assertEqual(len(os.listdir(metadata_dir)), 1) 

421 self.assertEqual(len(os.listdir(citations_dir)), 1) 

422 

423 @patch("crowdsourcing.meta_runner.run_meta_process") 

424 def test_process_single_issue_meta_process_failure(self, mock_run_meta): 

425 # Prepare test data 

426 issue = { 

427 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n123,456", 

428 "number": "1", 

429 "user": { 

430 "login": "test-user", 

431 "html_url": "https://github.com/test-user", 

432 "id": 12345, 

433 }, 

434 } 

435 base_settings = { 

436 "triplestore_url": "http://example.com/sparql", 

437 "base_output_dir": "/mnt/arcangelo/meta_output_current", 

438 } 

439 

440 # Make meta_process raise an exception 

441 mock_run_meta.side_effect = Exception("Meta process failed") 

442 

443 # Run the function 

444 success = process_single_issue(issue, base_settings) 

445 

446 # Verify results 

447 self.assertFalse(success) 

448 mock_run_meta.assert_called_once() 

449 

450 # Check that temporary config was cleaned up 

451 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

452 temp_config_path = os.path.join(base_dir, "meta_config_1.yaml") 

453 self.assertFalse(os.path.exists(temp_config_path)) 

454 

455 def test_process_single_issue_invalid_issue(self): 

456 # Test with invalid issue data 

457 issue = { 

458 "body": "Invalid issue body without separator", 

459 "number": "1", 

460 "user": { 

461 "login": "test-user", 

462 "html_url": "https://github.com/test-user", 

463 "id": 12345, 

464 }, 

465 } 

466 base_settings = { 

467 "triplestore_url": "http://example.com/sparql", 

468 "base_output_dir": "/mnt/arcangelo/meta_output_current", 

469 } 

470 

471 # Run the function 

472 success = process_single_issue(issue, base_settings) 

473 

474 # Verify results 

475 self.assertFalse(success) 

476 

477 # Check that no files were created 

478 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

479 metadata_dir = os.path.join(base_dir, "metadata") 

480 citations_dir = os.path.join(base_dir, "citations") 

481 self.assertTrue(os.path.exists(metadata_dir)) 

482 self.assertTrue(os.path.exists(citations_dir)) 

483 self.assertEqual(len(os.listdir(metadata_dir)), 0) 

484 self.assertEqual(len(os.listdir(citations_dir)), 0) 

485 

486 @patch("crowdsourcing.meta_runner.run_meta_process") 

487 def test_process_single_issue_settings_update(self, mock_run_meta): 

488 # Prepare test data 

489 issue = { 

490 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n123,456", 

491 "number": "1", 

492 "user": { 

493 "login": "test-user", 

494 "html_url": "https://github.com/test-user", 

495 "id": 12345, 

496 }, 

497 } 

498 base_settings = { 

499 "triplestore_url": "http://example.com/sparql", 

500 "base_output_dir": "/mnt/arcangelo/meta_output_current", 

501 "some_other_setting": "value", 

502 } 

503 

504 # Run the function 

505 success = process_single_issue(issue, base_settings) 

506 

507 # Verify results 

508 self.assertTrue(success) 

509 

510 # Check that run_meta_process was called with correct settings 

511 called_settings = mock_run_meta.call_args[1]["settings"] 

512 self.assertEqual( 

513 called_settings["source"], 

514 f"https://github.com/{os.environ['GITHUB_REPOSITORY']}/issues/1", 

515 ) 

516 self.assertEqual( 

517 called_settings["resp_agent"], "https://api.github.com/user/12345" 

518 ) 

519 self.assertEqual(called_settings["some_other_setting"], "value") 

520 self.assertTrue(called_settings["input_csv_dir"].endswith("metadata")) 

521 

522 @patch("crowdsourcing.meta_runner.run_meta_process") 

523 def test_process_single_issue_general_exception(self, mock_run_meta): 

524 """Test handling of general exceptions in process_single_issue.""" 

525 # Prepare test data 

526 issue = { 

527 "body": "id,title\n1,Test Title\n===###===@@@===citing,cited\n123,456", 

528 "number": "1", 

529 "user": { 

530 "login": "test-user", 

531 "html_url": "https://github.com/test-user", 

532 "id": 12345, 

533 }, 

534 } 

535 base_settings = { 

536 "triplestore_url": "http://example.com/sparql", 

537 "base_output_dir": "/test/output", 

538 } 

539 

540 # Make run_meta_process raise a general exception 

541 mock_run_meta.side_effect = Exception("Unexpected error") 

542 

543 # Run the function 

544 success = process_single_issue(issue, base_settings) 

545 

546 # Verify results 

547 self.assertFalse(success) 

548 mock_run_meta.assert_called_once() 

549 

550 # Check that temporary config was cleaned up 

551 base_dir = os.path.join("crowdsourcing_ingestion_data", time.strftime("%Y_%m")) 

552 temp_config_path = os.path.join(base_dir, "meta_config_1.yaml") 

553 self.assertFalse(os.path.exists(temp_config_path)) 

554 

555 

556class TestUpdateIssueLabels(unittest.TestCase): 

557 """Test the update_issue_labels function.""" 

558 

559 def setUp(self): 

560 """Set up test environment before each test.""" 

561 self.issue_number = "123" 

562 self.env_patcher = patch.dict( 

563 "os.environ", 

564 { 

565 "GH_TOKEN": "fake-token", 

566 "GITHUB_REPOSITORY": "test/repo", 

567 }, 

568 ) 

569 self.env_patcher.start() 

570 self.base_url = ( 

571 f"https://api.github.com/repos/test/repo/issues/{self.issue_number}" 

572 ) 

573 self.headers = { 

574 "Accept": "application/vnd.github+json", 

575 "Authorization": "Bearer fake-token", 

576 "X-GitHub-Api-Version": "2022-11-28", 

577 } 

578 

579 def tearDown(self): 

580 """Clean up after each test.""" 

581 self.env_patcher.stop() 

582 

583 @patch("requests.delete") 

584 @patch("requests.post") 

585 def test_successful_update_on_success(self, mock_post, mock_delete): 

586 """Test successful label update when processing succeeds.""" 

587 # Setup mocks 

588 mock_delete.return_value.status_code = 200 

589 mock_post.return_value.status_code = 201 

590 

591 # Call function 

592 update_issue_labels(self.issue_number, success=True) 

593 

594 # Verify delete call to remove 'to be processed' 

595 mock_delete.assert_called_once_with( 

596 f"{self.base_url}/labels/to%20be%20processed", 

597 headers=self.headers, 

598 timeout=30, 

599 ) 

600 

601 # Verify post call to add 'done' label 

602 mock_post.assert_called_once_with( 

603 f"{self.base_url}/labels", 

604 headers=self.headers, 

605 json={"labels": ["done"]}, 

606 timeout=30, 

607 ) 

608 

609 @patch("requests.delete") 

610 @patch("requests.post") 

611 def test_successful_update_on_failure(self, mock_post, mock_delete): 

612 """Test successful label update when processing fails.""" 

613 # Setup mocks 

614 mock_delete.return_value.status_code = 200 

615 mock_post.return_value.status_code = 201 

616 

617 # Call function 

618 update_issue_labels(self.issue_number, success=False) 

619 

620 # Verify delete call to remove 'to be processed' 

621 mock_delete.assert_called_once_with( 

622 f"{self.base_url}/labels/to%20be%20processed", 

623 headers=self.headers, 

624 timeout=30, 

625 ) 

626 

627 # Verify post call to add 'oc meta error' label 

628 mock_post.assert_called_once_with( 

629 f"{self.base_url}/labels", 

630 headers=self.headers, 

631 json={"labels": ["oc meta error"]}, 

632 timeout=30, 

633 ) 

634 

635 @patch("requests.delete") 

636 def test_delete_label_error(self, mock_delete): 

637 """Test error handling when removing label fails.""" 

638 # Setup mock to raise exception 

639 mock_delete.side_effect = requests.RequestException("Network error") 

640 

641 # Verify exception is raised 

642 with self.assertRaises(requests.RequestException) as context: 

643 update_issue_labels(self.issue_number, success=True) 

644 

645 self.assertEqual(str(context.exception), "Network error") 

646 mock_delete.assert_called_once() 

647 

648 @patch("requests.delete") 

649 @patch("requests.post") 

650 def test_add_label_error(self, mock_post, mock_delete): 

651 """Test error handling when adding new label fails.""" 

652 # Setup mocks 

653 mock_delete.return_value.status_code = 200 

654 mock_post.side_effect = requests.RequestException("Network error") 

655 

656 # Verify exception is raised 

657 with self.assertRaises(requests.RequestException) as context: 

658 update_issue_labels(self.issue_number, success=True) 

659 

660 self.assertEqual(str(context.exception), "Network error") 

661 mock_delete.assert_called_once() 

662 mock_post.assert_called_once() 

663 

664 @patch("requests.delete") 

665 @patch("requests.post") 

666 @patch("crowdsourcing.meta_runner.logger.error") 

667 def test_delete_label_non_200_response( 

668 self, mock_log_error, mock_post, mock_delete 

669 ): 

670 """Test handling of non-200 response when deleting label.""" 

671 # Setup mocks 

672 mock_delete.return_value.status_code = 404 

673 mock_delete.return_value.text = "Label not found" 

674 mock_post.return_value.status_code = 201 

675 

676 # Call function 

677 update_issue_labels(self.issue_number, success=True) 

678 

679 # Verify error was logged 

680 mock_log_error.assert_called_with( 

681 "Error response from delete label: Label not found" 

682 ) 

683 

684 # Verify post was still called 

685 mock_post.assert_called_once() 

686 

687 @patch("requests.delete") 

688 @patch("requests.post") 

689 @patch("crowdsourcing.meta_runner.logger.error") 

690 def test_add_label_non_200_response(self, mock_log_error, mock_post, mock_delete): 

691 """Test handling of non-200/201 response when adding label.""" 

692 # Setup mocks 

693 mock_delete.return_value.status_code = 200 

694 mock_post.return_value.status_code = 422 

695 mock_post.return_value.text = "Validation failed" 

696 

697 # Call function 

698 update_issue_labels(self.issue_number, success=True) 

699 

700 # Verify error was logged 

701 mock_log_error.assert_called_with("Error adding label: Validation failed") 

702 

703 

704class TestProcessMetaIssues(unittest.TestCase): 

705 """Test the main process_meta_issues function.""" 

706 

707 def setUp(self): 

708 """Set up test environment.""" 

709 self.env_patcher = patch.dict( 

710 "os.environ", 

711 { 

712 "GH_TOKEN": "fake-token", 

713 "GITHUB_REPOSITORY": "test/repo", 

714 }, 

715 ) 

716 self.env_patcher.start() 

717 

718 # Setup test configuration 

719 self.config_content = { 

720 "triplestore_url": "http://example.com/sparql", 

721 "base_output_dir": "/test/output", 

722 } 

723 

724 # Patch yaml.safe_load to return our test config 

725 self.yaml_patcher = patch("yaml.safe_load") 

726 self.mock_yaml_load = self.yaml_patcher.start() 

727 self.mock_yaml_load.return_value = self.config_content 

728 

729 def tearDown(self): 

730 """Clean up after each test.""" 

731 self.env_patcher.stop() 

732 self.yaml_patcher.stop() 

733 

734 @patch("crowdsourcing.meta_runner.check_triplestore_connection") 

735 @patch("crowdsourcing.meta_runner.get_closed_issues") 

736 @patch("crowdsourcing.meta_runner.process_single_issue") 

737 @patch("crowdsourcing.meta_runner.update_issue_labels") 

738 def test_successful_processing( 

739 self, mock_update_labels, mock_process_issue, mock_get_issues, mock_check_conn 

740 ): 

741 """Test successful processing of multiple issues.""" 

742 # Setup mocks 

743 mock_check_conn.return_value = True 

744 mock_get_issues.return_value = [ 

745 {"body": "test1", "number": "1"}, 

746 {"body": "test2", "number": "2"}, 

747 ] 

748 mock_process_issue.return_value = True 

749 

750 # Run function 

751 process_meta_issues() 

752 

753 # Verify all issues were processed 

754 self.assertEqual(mock_process_issue.call_count, 2) 

755 self.assertEqual(mock_update_labels.call_count, 2) 

756 

757 # Verify correct parameters were passed 

758 mock_process_issue.assert_any_call( 

759 {"body": "test1", "number": "1"}, self.config_content 

760 ) 

761 mock_update_labels.assert_any_call("1", True) 

762 mock_process_issue.assert_any_call( 

763 {"body": "test2", "number": "2"}, self.config_content 

764 ) 

765 mock_update_labels.assert_any_call("2", True) 

766 

767 @patch("crowdsourcing.meta_runner.check_triplestore_connection") 

768 def test_triplestore_not_responsive(self, mock_check_conn): 

769 """Test behavior when triplestore is not responsive.""" 

770 mock_check_conn.return_value = False 

771 

772 # Run function 

773 process_meta_issues() 

774 

775 # Verify early return 

776 mock_check_conn.assert_called_once_with(self.config_content["triplestore_url"]) 

777 

778 @patch("crowdsourcing.meta_runner.check_triplestore_connection") 

779 @patch("crowdsourcing.meta_runner.get_closed_issues") 

780 def test_no_issues_to_process(self, mock_get_issues, mock_check_conn): 

781 """Test behavior when no issues are found.""" 

782 mock_check_conn.return_value = True 

783 mock_get_issues.return_value = [] 

784 

785 # Run function 

786 process_meta_issues() 

787 

788 # Verify early return after finding no issues 

789 mock_get_issues.assert_called_once() 

790 

791 @patch("crowdsourcing.meta_runner.check_triplestore_connection") 

792 @patch("crowdsourcing.meta_runner.get_closed_issues") 

793 @patch("crowdsourcing.meta_runner.process_single_issue") 

794 @patch("crowdsourcing.meta_runner.update_issue_labels") 

795 def test_mixed_processing_results( 

796 self, mock_update_labels, mock_process_issue, mock_get_issues, mock_check_conn 

797 ): 

798 """Test processing with both successful and failed issues.""" 

799 # Setup mocks 

800 mock_check_conn.return_value = True 

801 mock_get_issues.return_value = [ 

802 {"body": "test1", "number": "1"}, 

803 {"body": "test2", "number": "2"}, 

804 ] 

805 # First issue succeeds, second fails 

806 mock_process_issue.side_effect = [True, False] 

807 

808 # Run function 

809 process_meta_issues() 

810 

811 # Verify all issues were processed 

812 self.assertEqual(mock_process_issue.call_count, 2) 

813 self.assertEqual(mock_update_labels.call_count, 2) 

814 

815 # Verify correct labels were set 

816 mock_update_labels.assert_any_call("1", True) 

817 mock_update_labels.assert_any_call("2", False) 

818 

819 @patch("crowdsourcing.meta_runner.check_triplestore_connection") 

820 @patch("crowdsourcing.meta_runner.get_closed_issues") 

821 @patch("crowdsourcing.meta_runner.process_single_issue") 

822 @patch("crowdsourcing.meta_runner.update_issue_labels") 

823 def test_error_handling( 

824 self, mock_update_labels, mock_process_issue, mock_get_issues, mock_check_conn 

825 ): 

826 """Test error handling during processing.""" 

827 # Setup mocks 

828 mock_check_conn.return_value = True 

829 mock_get_issues.return_value = [{"body": "test1", "number": "1"}] 

830 mock_process_issue.side_effect = Exception("Processing error") 

831 

832 # Verify exception is propagated 

833 with self.assertRaises(Exception) as context: 

834 process_meta_issues() 

835 

836 self.assertEqual(str(context.exception), "Processing error") 

837 

838 

839if __name__ == "__main__": # pragma: no cover 

840 unittest.main()