Coverage for crowdsourcing / meta_runner.py: 100%
171 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 14:31 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 14:31 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
6import csv
7import io
8import logging
9import os
10import time
11from typing import List
13import requests
14import yaml
15from oc_meta.run.meta_process import run_meta_process
16from SPARQLWrapper import JSON, SPARQLWrapper
19logger = logging.getLogger(__name__)
22def dump_csv(data_to_store: List[dict], output_path: str):
23 keys = data_to_store[0].keys()
24 with open(output_path, "w", newline="") as output_file:
25 dict_writer = csv.DictWriter(output_file, keys)
26 dict_writer.writeheader()
27 dict_writer.writerows(data_to_store)
30def check_triplestore_connection(endpoint_url: str) -> bool:
31 """Check if the triplestore is responsive with a simple SPARQL query.
33 Args:
34 endpoint_url: The URL of the SPARQL endpoint
36 Returns:
37 bool: True if the triplestore is responsive, False otherwise
38 """
39 try:
40 sparql = SPARQLWrapper(endpoint_url)
41 sparql.setQuery("SELECT ?s WHERE { ?s ?p ?o } LIMIT 1")
42 sparql.setReturnFormat(JSON)
43 sparql.query()
44 return True
45 except Exception as e:
46 logger.error(f"Error connecting to triplestore at {endpoint_url}: {str(e)}")
47 return False
50def get_ingestion_dirs() -> tuple[str, str, str]:
51 """Create and return paths for ingestion directories.
53 Creates a directory structure like:
54 crowdsourcing_ingestion_data/
55 └── YYYY_MM/
56 ├── metadata/
57 └── citations/
59 Returns:
60 tuple containing:
61 - base_dir: Path to the main ingestion directory for this month
62 - metadata_dir: Path to metadata directory
63 - citations_dir: Path to citations directory
64 """
65 current_date = time.strftime("%Y_%m")
66 base_dir = os.path.join("crowdsourcing_ingestion_data", current_date)
67 metadata_dir = os.path.join(base_dir, "metadata")
68 citations_dir = os.path.join(base_dir, "citations")
70 # Create directory structure
71 os.makedirs(metadata_dir, exist_ok=True)
72 os.makedirs(citations_dir, exist_ok=True)
74 return base_dir, metadata_dir, citations_dir
77def store_meta_input(issues: List[dict]) -> None:
78 """Store metadata and citations from issues into CSV files.
80 This function:
81 1. Creates directory structure for current month's ingestion
82 2. Extracts metadata and citations from each issue's body
83 3. Stores data in CSV files, with a maximum of 1000 records per file
85 Args:
86 issues: List of issue dictionaries containing body and number
88 Raises:
89 ValueError: If an issue's body doesn't contain the expected separator
90 IOError: If there are issues creating directories or writing files
91 """
92 _, metadata_dir, citations_dir = get_ingestion_dirs()
93 metadata_to_store = []
94 citations_to_store = []
95 metadata_counter = 0
96 citations_counter = 0
98 for issue in issues:
99 try:
100 issue_body = issue["body"]
101 if "===###===@@@===" not in issue_body:
102 logger.warning(
103 f"Warning: Issue #{issue['number']} does not contain the expected separator"
104 )
105 continue
107 # Split metadata and citations sections
108 metadata_section, citations_section = [
109 section.strip() for section in issue_body.split("===###===@@@===")
110 ]
112 if not metadata_section:
113 logger.warning(
114 f"Warning: Issue #{issue['number']} has empty metadata section"
115 )
116 continue
118 if not citations_section:
119 logger.warning(
120 f"Warning: Issue #{issue['number']} has empty citations section"
121 )
122 continue
124 # Process metadata
125 metadata = list(csv.DictReader(io.StringIO(metadata_section)))
126 if not metadata:
127 logger.warning(
128 f"Warning: Issue #{issue['number']} has no valid metadata records"
129 )
130 continue
132 # Process citations
133 citations = list(csv.DictReader(io.StringIO(citations_section)))
134 if not citations:
135 logger.warning(
136 f"Warning: Issue #{issue['number']} has no valid citation records"
137 )
138 continue
140 # Only extend the lists if both metadata and citations are valid
141 metadata_to_store.extend(metadata)
142 citations_to_store.extend(citations)
144 # Write metadata to file when we reach 1000 records
145 while len(metadata_to_store) >= 1000:
146 dump_csv(
147 metadata_to_store[:1000],
148 os.path.join(metadata_dir, f"{metadata_counter}.csv"),
149 )
150 metadata_to_store = metadata_to_store[1000:]
151 metadata_counter += 1
153 # Write citations to file when we reach 1000 records
154 while len(citations_to_store) >= 1000:
155 dump_csv(
156 citations_to_store[:1000],
157 os.path.join(citations_dir, f"{citations_counter}.csv"),
158 )
159 citations_to_store = citations_to_store[1000:]
160 citations_counter += 1
162 except (KeyError, csv.Error) as e:
163 logger.error(
164 f"Error processing issue #{issue.get('number', 'unknown')}: {str(e)}"
165 )
166 continue
168 # Write any remaining records
169 if metadata_to_store:
170 dump_csv(
171 metadata_to_store, os.path.join(metadata_dir, f"{metadata_counter}.csv")
172 )
173 if citations_to_store:
174 dump_csv(
175 citations_to_store,
176 os.path.join(citations_dir, f"{citations_counter}.csv"),
177 )
180def get_closed_issues() -> List[dict]:
181 """Fetch closed issues with 'to be processed' label using GitHub REST API."""
182 MAX_RETRIES = 3
183 RETRY_DELAY = 5
185 headers = {
186 "Accept": "application/vnd.github+json",
187 "Authorization": f"Bearer {os.environ['GH_TOKEN']}",
188 }
190 for attempt in range(MAX_RETRIES):
191 try:
192 response = requests.get(
193 f"https://api.github.com/repos/{os.environ['GITHUB_REPOSITORY']}/issues",
194 params={
195 "state": "closed",
196 "labels": "to be processed",
197 },
198 headers=headers,
199 timeout=30,
200 )
202 if response.status_code == 200:
203 issues = response.json()
204 logger.info(f"Found {len(issues)} issues")
205 return [
206 {
207 "body": issue["body"],
208 "number": str(issue["number"]),
209 "user": {
210 "login": issue["user"]["login"],
211 "html_url": issue["user"]["html_url"],
212 "id": issue["user"]["id"],
213 },
214 }
215 for issue in issues
216 ]
218 elif response.status_code == 404:
219 logger.error("Repository or endpoint not found (404)")
220 return []
222 elif (
223 response.status_code == 403
224 and "X-RateLimit-Remaining" in response.headers
225 ):
226 logger.info(
227 f"Rate limit info: {response.headers.get('X-RateLimit-Remaining')} requests remaining"
228 )
229 if int(response.headers["X-RateLimit-Remaining"]) == 0:
230 reset_time = int(response.headers["X-RateLimit-Reset"])
231 current_time = time.time()
232 if reset_time > current_time:
233 sleep_time = reset_time - current_time
234 logger.info(
235 f"Rate limit exceeded. Waiting {sleep_time} seconds"
236 )
237 time.sleep(sleep_time)
238 continue
239 continue
240 else:
241 logger.error(f"Unexpected status code: {response.status_code}")
242 logger.error(f"Response body: {response.text}")
244 except (requests.RequestException, KeyError) as e:
245 logger.error(f"Error during request: {str(e)}")
246 if attempt < MAX_RETRIES - 1:
247 logger.info(f"Waiting {RETRY_DELAY} seconds before retry")
248 time.sleep(RETRY_DELAY)
249 continue
250 raise RuntimeError(
251 f"Failed to fetch issues after {MAX_RETRIES} attempts"
252 ) from e
254 return []
257def process_single_issue(issue: dict, base_settings: dict) -> bool:
258 """Process a single issue, updating meta configuration with issue URI as source.
260 Args:
261 issue: Dictionary containing issue data
262 base_settings: Base meta configuration settings
264 Returns:
265 bool: True if processing was successful, False otherwise
266 """
267 # Store metadata and citations for this issue
268 store_meta_input([issue])
270 # Get paths for current ingestion
271 base_dir, metadata_dir, citations_dir = get_ingestion_dirs()
273 # Create issue-specific settings with issue URI as source and GitHub user as resp_agent
274 issue_settings = base_settings.copy()
275 issue_number = str(issue["number"])
276 issue_settings.update(
277 {
278 "input_csv_dir": metadata_dir,
279 "source": f"https://github.com/{os.environ['GITHUB_REPOSITORY']}/issues/{issue_number}",
280 "resp_agent": f"https://api.github.com/user/{issue['user']['id']}",
281 }
282 )
284 # Create temporary config file with issue-specific settings
285 temp_config_path = os.path.join(base_dir, f"meta_config_{issue_number}.yaml")
286 with open(temp_config_path, "w", encoding="utf-8") as f:
287 yaml.safe_dump(issue_settings, f)
289 # Run meta processing for this issue
290 try:
291 run_meta_process(
292 settings=issue_settings,
293 meta_config_path=temp_config_path,
294 resp_agents_only=False,
295 )
296 return True
297 except Exception as e:
298 logger.error(f"Error processing issue #{issue_number}: {str(e)}")
299 return False
300 finally:
301 # Clean up temporary config
302 if os.path.exists(temp_config_path):
303 os.remove(temp_config_path)
306def update_issue_labels(issue_number: str, success: bool) -> None:
307 """Update issue labels based on processing result using GitHub REST API.
309 Args:
310 issue_number: The issue number to update
311 success: Whether the processing was successful
312 """
313 logger.info(f"Updating labels for issue #{issue_number} (success={success})")
315 headers = {
316 "Accept": "application/vnd.github+json",
317 "Authorization": f"Bearer {os.environ['GH_TOKEN']}",
318 "X-GitHub-Api-Version": "2022-11-28",
319 }
321 base_url = f"https://api.github.com/repos/{os.environ['GITHUB_REPOSITORY']}/issues/{issue_number}"
322 logger.info(f"Using API URL: {base_url}")
324 try:
325 # Remove 'to be processed' label
326 logger.info("Attempting to remove 'to be processed' label")
327 response = requests.delete(
328 f"{base_url}/labels/to%20be%20processed",
329 headers=headers,
330 timeout=30,
331 )
332 logger.info(f"Delete label response status: {response.status_code}")
333 if response.status_code != 200:
334 logger.error(f"Error response from delete label: {response.text}")
336 # Add appropriate label based on success
337 new_label = "done" if success else "oc meta error"
338 logger.info(f"Attempting to add new label: {new_label}")
339 response = requests.post(
340 f"{base_url}/labels",
341 headers=headers,
342 json={"labels": [new_label]},
343 timeout=30,
344 )
345 logger.info(f"Add label response status: {response.status_code}")
346 if response.status_code not in [200, 201]: # Both are valid success codes
347 logger.error(f"Error adding label: {response.text}")
348 else:
349 logger.info(
350 f"Successfully added label '{new_label}' to issue #{issue_number}"
351 )
353 except requests.RequestException as e:
354 logger.error(f"Error updating labels for issue {issue_number}: {e}")
355 raise
358def process_meta_issues() -> None:
359 """Process closed issues with 'to be processed' label for meta data extraction.
361 This function:
362 1. Checks triplestore connection
363 2. Fetches closed issues with 'to be processed' label
364 3. Processes each issue individually:
365 - Updates meta configuration with issue URI as source
366 - Processes metadata and citations
367 - Runs meta processing pipeline
368 4. Updates issue labels based on processing results
369 """
371 try:
372 # Load base meta configuration
373 with open("meta_config.yaml", encoding="utf-8") as f:
374 base_settings = yaml.safe_load(f)
376 # Check triplestore connection first
377 if not check_triplestore_connection(base_settings["triplestore_url"]):
378 logger.error("Triplestore is not responsive, aborting process")
379 return
381 issues = get_closed_issues()
383 if not issues:
384 logger.info("No issues to process")
385 return
387 # Process each issue individually
388 for issue in issues:
389 issue_number = str(issue["number"])
390 logger.info(f"\nProcessing issue #{issue_number}")
392 success = process_single_issue(issue, base_settings)
393 update_issue_labels(issue_number, success)
395 except Exception as e:
396 logger.error(f"Error in process_meta_issues: {str(e)}", exc_info=True)
397 raise
400if __name__ == "__main__": # pragma: no cover
401 process_meta_issues()