Coverage for crowdsourcing / meta_runner.py: 100%

171 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-21 14:31 +0000

1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5 

6import csv 

7import io 

8import logging 

9import os 

10import time 

11from typing import List 

12 

13import requests 

14import yaml 

15from oc_meta.run.meta_process import run_meta_process 

16from SPARQLWrapper import JSON, SPARQLWrapper 

17 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22def dump_csv(data_to_store: List[dict], output_path: str): 

23 keys = data_to_store[0].keys() 

24 with open(output_path, "w", newline="") as output_file: 

25 dict_writer = csv.DictWriter(output_file, keys) 

26 dict_writer.writeheader() 

27 dict_writer.writerows(data_to_store) 

28 

29 

30def check_triplestore_connection(endpoint_url: str) -> bool: 

31 """Check if the triplestore is responsive with a simple SPARQL query. 

32 

33 Args: 

34 endpoint_url: The URL of the SPARQL endpoint 

35 

36 Returns: 

37 bool: True if the triplestore is responsive, False otherwise 

38 """ 

39 try: 

40 sparql = SPARQLWrapper(endpoint_url) 

41 sparql.setQuery("SELECT ?s WHERE { ?s ?p ?o } LIMIT 1") 

42 sparql.setReturnFormat(JSON) 

43 sparql.query() 

44 return True 

45 except Exception as e: 

46 logger.error(f"Error connecting to triplestore at {endpoint_url}: {str(e)}") 

47 return False 

48 

49 

50def get_ingestion_dirs() -> tuple[str, str, str]: 

51 """Create and return paths for ingestion directories. 

52 

53 Creates a directory structure like: 

54 crowdsourcing_ingestion_data/ 

55 └── YYYY_MM/ 

56 ├── metadata/ 

57 └── citations/ 

58 

59 Returns: 

60 tuple containing: 

61 - base_dir: Path to the main ingestion directory for this month 

62 - metadata_dir: Path to metadata directory 

63 - citations_dir: Path to citations directory 

64 """ 

65 current_date = time.strftime("%Y_%m") 

66 base_dir = os.path.join("crowdsourcing_ingestion_data", current_date) 

67 metadata_dir = os.path.join(base_dir, "metadata") 

68 citations_dir = os.path.join(base_dir, "citations") 

69 

70 # Create directory structure 

71 os.makedirs(metadata_dir, exist_ok=True) 

72 os.makedirs(citations_dir, exist_ok=True) 

73 

74 return base_dir, metadata_dir, citations_dir 

75 

76 

77def store_meta_input(issues: List[dict]) -> None: 

78 """Store metadata and citations from issues into CSV files. 

79 

80 This function: 

81 1. Creates directory structure for current month's ingestion 

82 2. Extracts metadata and citations from each issue's body 

83 3. Stores data in CSV files, with a maximum of 1000 records per file 

84 

85 Args: 

86 issues: List of issue dictionaries containing body and number 

87 

88 Raises: 

89 ValueError: If an issue's body doesn't contain the expected separator 

90 IOError: If there are issues creating directories or writing files 

91 """ 

92 _, metadata_dir, citations_dir = get_ingestion_dirs() 

93 metadata_to_store = [] 

94 citations_to_store = [] 

95 metadata_counter = 0 

96 citations_counter = 0 

97 

98 for issue in issues: 

99 try: 

100 issue_body = issue["body"] 

101 if "===###===@@@===" not in issue_body: 

102 logger.warning( 

103 f"Warning: Issue #{issue['number']} does not contain the expected separator" 

104 ) 

105 continue 

106 

107 # Split metadata and citations sections 

108 metadata_section, citations_section = [ 

109 section.strip() for section in issue_body.split("===###===@@@===") 

110 ] 

111 

112 if not metadata_section: 

113 logger.warning( 

114 f"Warning: Issue #{issue['number']} has empty metadata section" 

115 ) 

116 continue 

117 

118 if not citations_section: 

119 logger.warning( 

120 f"Warning: Issue #{issue['number']} has empty citations section" 

121 ) 

122 continue 

123 

124 # Process metadata 

125 metadata = list(csv.DictReader(io.StringIO(metadata_section))) 

126 if not metadata: 

127 logger.warning( 

128 f"Warning: Issue #{issue['number']} has no valid metadata records" 

129 ) 

130 continue 

131 

132 # Process citations 

133 citations = list(csv.DictReader(io.StringIO(citations_section))) 

134 if not citations: 

135 logger.warning( 

136 f"Warning: Issue #{issue['number']} has no valid citation records" 

137 ) 

138 continue 

139 

140 # Only extend the lists if both metadata and citations are valid 

141 metadata_to_store.extend(metadata) 

142 citations_to_store.extend(citations) 

143 

144 # Write metadata to file when we reach 1000 records 

145 while len(metadata_to_store) >= 1000: 

146 dump_csv( 

147 metadata_to_store[:1000], 

148 os.path.join(metadata_dir, f"{metadata_counter}.csv"), 

149 ) 

150 metadata_to_store = metadata_to_store[1000:] 

151 metadata_counter += 1 

152 

153 # Write citations to file when we reach 1000 records 

154 while len(citations_to_store) >= 1000: 

155 dump_csv( 

156 citations_to_store[:1000], 

157 os.path.join(citations_dir, f"{citations_counter}.csv"), 

158 ) 

159 citations_to_store = citations_to_store[1000:] 

160 citations_counter += 1 

161 

162 except (KeyError, csv.Error) as e: 

163 logger.error( 

164 f"Error processing issue #{issue.get('number', 'unknown')}: {str(e)}" 

165 ) 

166 continue 

167 

168 # Write any remaining records 

169 if metadata_to_store: 

170 dump_csv( 

171 metadata_to_store, os.path.join(metadata_dir, f"{metadata_counter}.csv") 

172 ) 

173 if citations_to_store: 

174 dump_csv( 

175 citations_to_store, 

176 os.path.join(citations_dir, f"{citations_counter}.csv"), 

177 ) 

178 

179 

180def get_closed_issues() -> List[dict]: 

181 """Fetch closed issues with 'to be processed' label using GitHub REST API.""" 

182 MAX_RETRIES = 3 

183 RETRY_DELAY = 5 

184 

185 headers = { 

186 "Accept": "application/vnd.github+json", 

187 "Authorization": f"Bearer {os.environ['GH_TOKEN']}", 

188 } 

189 

190 for attempt in range(MAX_RETRIES): 

191 try: 

192 response = requests.get( 

193 f"https://api.github.com/repos/{os.environ['GITHUB_REPOSITORY']}/issues", 

194 params={ 

195 "state": "closed", 

196 "labels": "to be processed", 

197 }, 

198 headers=headers, 

199 timeout=30, 

200 ) 

201 

202 if response.status_code == 200: 

203 issues = response.json() 

204 logger.info(f"Found {len(issues)} issues") 

205 return [ 

206 { 

207 "body": issue["body"], 

208 "number": str(issue["number"]), 

209 "user": { 

210 "login": issue["user"]["login"], 

211 "html_url": issue["user"]["html_url"], 

212 "id": issue["user"]["id"], 

213 }, 

214 } 

215 for issue in issues 

216 ] 

217 

218 elif response.status_code == 404: 

219 logger.error("Repository or endpoint not found (404)") 

220 return [] 

221 

222 elif ( 

223 response.status_code == 403 

224 and "X-RateLimit-Remaining" in response.headers 

225 ): 

226 logger.info( 

227 f"Rate limit info: {response.headers.get('X-RateLimit-Remaining')} requests remaining" 

228 ) 

229 if int(response.headers["X-RateLimit-Remaining"]) == 0: 

230 reset_time = int(response.headers["X-RateLimit-Reset"]) 

231 current_time = time.time() 

232 if reset_time > current_time: 

233 sleep_time = reset_time - current_time 

234 logger.info( 

235 f"Rate limit exceeded. Waiting {sleep_time} seconds" 

236 ) 

237 time.sleep(sleep_time) 

238 continue 

239 continue 

240 else: 

241 logger.error(f"Unexpected status code: {response.status_code}") 

242 logger.error(f"Response body: {response.text}") 

243 

244 except (requests.RequestException, KeyError) as e: 

245 logger.error(f"Error during request: {str(e)}") 

246 if attempt < MAX_RETRIES - 1: 

247 logger.info(f"Waiting {RETRY_DELAY} seconds before retry") 

248 time.sleep(RETRY_DELAY) 

249 continue 

250 raise RuntimeError( 

251 f"Failed to fetch issues after {MAX_RETRIES} attempts" 

252 ) from e 

253 

254 return [] 

255 

256 

257def process_single_issue(issue: dict, base_settings: dict) -> bool: 

258 """Process a single issue, updating meta configuration with issue URI as source. 

259 

260 Args: 

261 issue: Dictionary containing issue data 

262 base_settings: Base meta configuration settings 

263 

264 Returns: 

265 bool: True if processing was successful, False otherwise 

266 """ 

267 # Store metadata and citations for this issue 

268 store_meta_input([issue]) 

269 

270 # Get paths for current ingestion 

271 base_dir, metadata_dir, citations_dir = get_ingestion_dirs() 

272 

273 # Create issue-specific settings with issue URI as source and GitHub user as resp_agent 

274 issue_settings = base_settings.copy() 

275 issue_number = str(issue["number"]) 

276 issue_settings.update( 

277 { 

278 "input_csv_dir": metadata_dir, 

279 "source": f"https://github.com/{os.environ['GITHUB_REPOSITORY']}/issues/{issue_number}", 

280 "resp_agent": f"https://api.github.com/user/{issue['user']['id']}", 

281 } 

282 ) 

283 

284 # Create temporary config file with issue-specific settings 

285 temp_config_path = os.path.join(base_dir, f"meta_config_{issue_number}.yaml") 

286 with open(temp_config_path, "w", encoding="utf-8") as f: 

287 yaml.safe_dump(issue_settings, f) 

288 

289 # Run meta processing for this issue 

290 try: 

291 run_meta_process( 

292 settings=issue_settings, 

293 meta_config_path=temp_config_path, 

294 resp_agents_only=False, 

295 ) 

296 return True 

297 except Exception as e: 

298 logger.error(f"Error processing issue #{issue_number}: {str(e)}") 

299 return False 

300 finally: 

301 # Clean up temporary config 

302 if os.path.exists(temp_config_path): 

303 os.remove(temp_config_path) 

304 

305 

306def update_issue_labels(issue_number: str, success: bool) -> None: 

307 """Update issue labels based on processing result using GitHub REST API. 

308 

309 Args: 

310 issue_number: The issue number to update 

311 success: Whether the processing was successful 

312 """ 

313 logger.info(f"Updating labels for issue #{issue_number} (success={success})") 

314 

315 headers = { 

316 "Accept": "application/vnd.github+json", 

317 "Authorization": f"Bearer {os.environ['GH_TOKEN']}", 

318 "X-GitHub-Api-Version": "2022-11-28", 

319 } 

320 

321 base_url = f"https://api.github.com/repos/{os.environ['GITHUB_REPOSITORY']}/issues/{issue_number}" 

322 logger.info(f"Using API URL: {base_url}") 

323 

324 try: 

325 # Remove 'to be processed' label 

326 logger.info("Attempting to remove 'to be processed' label") 

327 response = requests.delete( 

328 f"{base_url}/labels/to%20be%20processed", 

329 headers=headers, 

330 timeout=30, 

331 ) 

332 logger.info(f"Delete label response status: {response.status_code}") 

333 if response.status_code != 200: 

334 logger.error(f"Error response from delete label: {response.text}") 

335 

336 # Add appropriate label based on success 

337 new_label = "done" if success else "oc meta error" 

338 logger.info(f"Attempting to add new label: {new_label}") 

339 response = requests.post( 

340 f"{base_url}/labels", 

341 headers=headers, 

342 json={"labels": [new_label]}, 

343 timeout=30, 

344 ) 

345 logger.info(f"Add label response status: {response.status_code}") 

346 if response.status_code not in [200, 201]: # Both are valid success codes 

347 logger.error(f"Error adding label: {response.text}") 

348 else: 

349 logger.info( 

350 f"Successfully added label '{new_label}' to issue #{issue_number}" 

351 ) 

352 

353 except requests.RequestException as e: 

354 logger.error(f"Error updating labels for issue {issue_number}: {e}") 

355 raise 

356 

357 

358def process_meta_issues() -> None: 

359 """Process closed issues with 'to be processed' label for meta data extraction. 

360 

361 This function: 

362 1. Checks triplestore connection 

363 2. Fetches closed issues with 'to be processed' label 

364 3. Processes each issue individually: 

365 - Updates meta configuration with issue URI as source 

366 - Processes metadata and citations 

367 - Runs meta processing pipeline 

368 4. Updates issue labels based on processing results 

369 """ 

370 

371 try: 

372 # Load base meta configuration 

373 with open("meta_config.yaml", encoding="utf-8") as f: 

374 base_settings = yaml.safe_load(f) 

375 

376 # Check triplestore connection first 

377 if not check_triplestore_connection(base_settings["triplestore_url"]): 

378 logger.error("Triplestore is not responsive, aborting process") 

379 return 

380 

381 issues = get_closed_issues() 

382 

383 if not issues: 

384 logger.info("No issues to process") 

385 return 

386 

387 # Process each issue individually 

388 for issue in issues: 

389 issue_number = str(issue["number"]) 

390 logger.info(f"\nProcessing issue #{issue_number}") 

391 

392 success = process_single_issue(issue, base_settings) 

393 update_issue_labels(issue_number, success) 

394 

395 except Exception as e: 

396 logger.error(f"Error in process_meta_issues: {str(e)}", exc_info=True) 

397 raise 

398 

399 

400if __name__ == "__main__": # pragma: no cover 

401 process_meta_issues()