Coverage for crowdsourcing / archive_manager.py: 100%
96 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 14:31 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 14:31 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import json
6import logging
7import os
8import re
9from datetime import datetime
10from pathlib import Path
11from typing import Optional
13import requests
14import yaml
15from crowdsourcing.zenodo_utils import (
16 create_deposition_resource,
17 get_zenodo_base_url,
18 get_zenodo_token,
19)
21# Configure logging
22logger = logging.getLogger(__name__)
25class ArchiveManager:
26 """Manages the archival of validation reports to Zenodo."""
28 def __init__(self, config_path: str = "archive_config.yaml"):
29 """Initialize the archive manager.
31 Args:
32 config_path: Path to the archive configuration file
33 """
34 self.config_path = config_path
35 self.config = self._load_config()
36 self.index_path = Path(self.config["validation_reports"]["index_file"])
37 self.reports_dir = Path(self.config["validation_reports"]["reports_dir"])
39 # Create reports directory if it doesn't exist
40 self.reports_dir.mkdir(parents=True, exist_ok=True)
42 # Create index file if it doesn't exist
43 if not self.index_path.exists():
44 self._init_index()
46 def _load_config(self) -> dict:
47 """Load the archive configuration file.
49 Raises:
50 FileNotFoundError: If the configuration file does not exist
51 """
52 with open(self.config_path) as f:
53 return yaml.safe_load(f)
55 def _init_index(self) -> None:
56 """Initialize the index file if it doesn't exist."""
57 index_data = {
58 "github_reports": {}, # filename -> github_url
59 "zenodo_reports": {}, # filename -> {"url": direct_file_url, "doi": doi_url}
60 "last_archive": None, # timestamp of last archive
61 }
63 with open(self.index_path, "w") as f:
64 json.dump(index_data, f, indent=2)
66 def _load_index(self) -> dict:
67 """Load the current index file."""
68 with open(self.index_path, "r", encoding="utf-8") as f:
69 return json.load(f)
71 def _save_index(self, index_data: dict) -> None:
72 """Save the index file."""
73 with open(self.index_path, "w", encoding="utf-8") as f:
74 json.dump(index_data, f, indent=2)
76 def add_report(self, report_filename: str, github_url: str) -> None:
77 """Add a new report to the index.
79 Args:
80 report_filename: Name of the report file
81 github_url: GitHub Pages URL where the report is hosted
82 """
83 # Ensure reports directory exists
84 self.reports_dir.mkdir(parents=True, exist_ok=True)
86 # Ensure index exists (needed when directory is recreated)
87 if not self.index_path.exists():
88 self._init_index()
90 index_data = self._load_index()
91 index_data["github_reports"][report_filename] = github_url
92 self._save_index(index_data)
94 def archive_reports(self) -> Optional[str]:
95 """Archive reports to Zenodo when threshold is reached.
97 Returns:
98 The DOI of the created Zenodo deposit, or None if no archival was needed
99 """
100 index_data = self._load_index()
101 github_reports = index_data["github_reports"]
103 # Get all reports to archive
104 reports_to_archive = sorted(
105 github_reports.keys(),
106 key=lambda x: (
107 os.path.getctime(os.path.join(self.reports_dir, x))
108 if os.path.exists(os.path.join(self.reports_dir, x))
109 else 0
110 ),
111 )
113 if not reports_to_archive:
114 return None
116 try:
117 # Create Zenodo deposition
118 base_url = get_zenodo_base_url()
119 date = datetime.now().strftime("%Y-%m-%d")
120 metadata = self.config["zenodo"]["metadata_template"].copy()
122 # Create a meaningful title with number of reports and date range
123 first_report_date = min(
124 datetime.fromtimestamp(
125 os.path.getctime(os.path.join(self.reports_dir, x))
126 ).strftime("%Y-%m-%d")
127 for x in reports_to_archive
128 if os.path.exists(os.path.join(self.reports_dir, x))
129 )
130 last_report_date = max(
131 datetime.fromtimestamp(
132 os.path.getctime(os.path.join(self.reports_dir, x))
133 ).strftime("%Y-%m-%d")
134 for x in reports_to_archive
135 if os.path.exists(os.path.join(self.reports_dir, x))
136 )
137 date_range = (
138 f"from {first_report_date} to {last_report_date}"
139 if first_report_date != last_report_date
140 else f"on {first_report_date}"
141 )
143 # Extract issue numbers from filenames
144 issue_numbers = []
145 for report in reports_to_archive:
146 match = re.search(r"validation_issue_(\d+)\.html", report)
147 if match:
148 issue_numbers.append(int(match.group(1)))
150 min_issue = min(issue_numbers) if issue_numbers else 0
151 max_issue = max(issue_numbers) if issue_numbers else 0
153 metadata["title"] = (
154 f"OpenCitations validation reports: issues #{min_issue} to #{max_issue} ({date_range})"
155 )
156 metadata["description"] = (
157 f"This deposit contains {len(reports_to_archive)} validation reports generated {date_range} to validate citation data and metadata submitted through GitHub issues in the OpenCitations crowdsourcing repository."
158 )
159 metadata["publication_date"] = date
161 deposition_id, bucket_url = create_deposition_resource(
162 date=date,
163 metadata=metadata,
164 base_url=base_url,
165 )
167 # Upload reports to Zenodo
168 for report in reports_to_archive:
169 report_path = os.path.join(self.reports_dir, report)
171 with open(report_path, "r", encoding="utf-8") as f:
172 r = requests.put(
173 f"{bucket_url}/{report}",
174 data=f,
175 params={"access_token": get_zenodo_token()},
176 )
177 r.raise_for_status()
179 # Publish the deposition
180 r = requests.post(
181 f"{base_url}/deposit/depositions/{deposition_id}/actions/publish",
182 params={"access_token": get_zenodo_token()},
183 )
184 r.raise_for_status()
186 response_data = r.json()
187 doi = response_data["doi"]
188 # Get the base URL for files (remove /api from base_url)
189 files_base_url = base_url
191 # Update index
192 for report in reports_to_archive:
193 report_path = os.path.join(self.reports_dir, report)
194 # Move from github_reports to zenodo_reports
195 github_url = github_reports.pop(report)
196 index_data["zenodo_reports"][report] = {
197 "url": f"{files_base_url}/records/{deposition_id}/files/{report}/content",
198 "doi": f"https://doi.org/{doi}",
199 }
200 # Delete the report file
201 os.remove(report_path)
203 index_data["last_archive"] = datetime.now().isoformat()
204 self._save_index(index_data)
206 return doi
208 except Exception as e:
209 logger.error(f"Failed to archive reports: {e}")
210 raise
212 def get_report_url(self, report_filename: str) -> Optional[str]:
213 """Get the current URL for a report.
215 Args:
216 report_filename: Name of the report file
218 Returns:
219 URL where the report can be found (either on GitHub or Zenodo)
220 """
221 index_data = self._load_index()
223 # Check GitHub reports first
224 if report_filename in index_data["github_reports"]:
225 return index_data["github_reports"][report_filename]
227 # Then check Zenodo - return direct URL if available
228 if report_filename in index_data["zenodo_reports"]:
229 zenodo_data = index_data["zenodo_reports"][report_filename]
230 return zenodo_data["url"] # Return direct URL as primary choice
232 return None
234 def needs_archival(self) -> bool:
235 """Check if reports need to be archived based on configuration threshold.
237 Returns:
238 bool: True if number of reports exceeds threshold, False otherwise
239 """
240 index_data = self._load_index()
241 return (
242 len(index_data["github_reports"])
243 >= self.config["validation_reports"]["max_reports_before_archive"]
244 )