Coverage for heritrace / apis / zenodo.py: 99%
185 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-07-02 10:16 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from datetime import datetime
6from functools import lru_cache
7from http import HTTPStatus
8from time import sleep
9from typing import TypedDict
10from urllib.parse import urlparse
12import requests
13from requests.exceptions import ConnectionError as RequestsConnectionError
14from requests.exceptions import RequestException, Timeout
17class ZenodoRecord(TypedDict):
18 title: str
19 authors: list[dict[str, str]]
20 doi: str | None
21 publication_date: str | None
22 version: str
23 type: str
24 subtype: str
25 journal: str
26 journal_volume: str
27 journal_issue: str
28 journal_pages: str
29 conference: str
30 conference_acronym: str
31 conference_place: str
32 conference_date: str
33 publisher: str
34 keywords: list[str]
35 description: str
36 access_right: str
37 language: str
38 record_id: str
39 notes: str
42_MIN_AUTHORS_FOR_ET_AL = 2
45class ZenodoRequestError(Exception):
46 """Custom exception for Zenodo API errors"""
49def is_zenodo_url(url: str) -> bool:
50 """Check if a URL is a Zenodo URL or DOI."""
51 if not isinstance(url, str):
52 return False
53 parsed = urlparse(url)
54 if parsed.netloc in ["zenodo.org", "www.zenodo.org"]:
55 return True
56 if parsed.netloc in ["doi.org", "www.doi.org"]:
57 doi_path = parsed.path.lstrip("/")
58 return doi_path.startswith("10.5281/zenodo.")
59 return bool(url.startswith("10.5281/zenodo."))
62def extract_zenodo_id(url: str) -> str | None:
63 """
64 Extract Zenodo record ID from URL or DOI.
66 Args:
67 url (str): The URL or DOI to parse
69 Returns:
70 str: The Zenodo record ID or None if not found
71 """
72 if not isinstance(url, str):
73 return None
74 parsed = urlparse(url)
75 if parsed.netloc in ["doi.org", "www.doi.org"]:
76 doi_path = parsed.path.lstrip("/")
77 if doi_path.startswith("10.5281/zenodo."):
78 return doi_path.split("10.5281/zenodo.")[1]
79 elif parsed.netloc in ["zenodo.org", "www.zenodo.org"]:
80 path_parts = parsed.path.strip("/").split("/")
81 if "record" in path_parts:
82 return path_parts[path_parts.index("record") + 1]
83 if "records" in path_parts:
84 return path_parts[path_parts.index("records") + 1]
85 elif url.startswith("10.5281/zenodo."):
86 return url.split("10.5281/zenodo.")[1]
87 return None
90def make_request_with_retry(
91 url: str,
92 headers: dict[str, str],
93 max_retries: int = 3,
94 initial_delay: float = 1,
95) -> requests.Response:
96 """
97 Make HTTP request with exponential backoff retry strategy.
99 Args:
100 url (str): The URL to request
101 headers (dict): Request headers
102 max_retries (int): Maximum number of retry attempts
103 initial_delay (int): Initial delay between retries in seconds
105 Returns:
106 requests.Response: The response if successful
108 Raises:
109 ZenodoRequestError: If all retries fail
110 """
111 delay = initial_delay
112 last_exception = None
114 for attempt in range(max_retries):
115 try:
116 response = requests.get(url, headers=headers, timeout=5)
118 # Check if we got rate limited
119 if response.status_code == HTTPStatus.TOO_MANY_REQUESTS:
120 retry_after = int(response.headers.get("Retry-After", delay))
121 sleep(retry_after)
122 continue
124 # If we get a 5xx error, retry
125 if HTTPStatus.INTERNAL_SERVER_ERROR <= response.status_code < 600: # noqa: PLR2004
126 msg = f"Server error: {response.status_code}"
127 raise ZenodoRequestError(msg)
129 response.raise_for_status()
131 except (RequestException, RequestsConnectionError, Timeout) as e:
132 last_exception = e
134 # Don't sleep after the last attempt
135 if attempt < max_retries - 1:
136 sleep(delay)
137 delay *= 2 # Exponential backoff
138 else:
139 return response
141 msg = f"Failed after {max_retries} attempts. Last error: {last_exception!s}"
142 raise ZenodoRequestError(msg)
145@lru_cache(maxsize=1000)
146def get_zenodo_data(
147 record_id: str,
148) -> ZenodoRecord | None:
149 """Fetch record data from Zenodo API with caching and retry logic."""
150 headers = {
151 "Accept": "application/json",
152 "User-Agent": "YourApp/1.0 (your@email.com)",
153 }
155 try:
156 response = make_request_with_retry(
157 f"https://zenodo.org/api/records/{record_id}", headers=headers
158 )
159 except ZenodoRequestError:
160 return None
161 else:
162 data = response.json()
163 metadata = data.get("metadata", {})
165 # Extract all possible metadata for APA citation
166 return {
167 "title": metadata.get("title"),
168 "authors": [
169 {
170 "name": creator.get("name", ""),
171 "orcid": creator.get("orcid", ""),
172 "affiliation": creator.get("affiliation", ""),
173 }
174 for creator in metadata.get("creators", [])
175 ],
176 "doi": metadata.get("doi"),
177 "publication_date": metadata.get("publication_date"),
178 "version": metadata.get("version", ""),
179 "type": metadata.get("resource_type", {}).get("type", ""),
180 "subtype": metadata.get("resource_type", {}).get("subtype", ""),
181 "journal": metadata.get("journal", {}).get("title", ""),
182 "journal_volume": metadata.get("journal", {}).get("volume", ""),
183 "journal_issue": metadata.get("journal", {}).get("issue", ""),
184 "journal_pages": metadata.get("journal", {}).get("pages", ""),
185 "conference": metadata.get("conference", {}).get("title", ""),
186 "conference_acronym": metadata.get("conference", {}).get("acronym", ""),
187 "conference_place": metadata.get("conference", {}).get("place", ""),
188 "conference_date": metadata.get("conference", {}).get("date", ""),
189 "publisher": metadata.get("publisher", ""),
190 "keywords": metadata.get("keywords", []),
191 "description": metadata.get("description", ""),
192 "access_right": metadata.get("access_right", ""),
193 "language": metadata.get("language", ""),
194 "record_id": record_id,
195 "notes": metadata.get("notes", ""),
196 }
199def format_apa_date(date_str: str) -> str:
200 """Format a date in APA style (YYYY, Month DD)."""
201 try:
202 date_obj = datetime.strptime(date_str, "%Y-%m-%d")
203 return date_obj.strftime("%Y, %B %d")
204 except ValueError:
205 try:
206 date_obj = datetime.strptime(date_str, "%Y-%m")
207 return date_obj.strftime("%Y, %B")
208 except ValueError:
209 try:
210 date_obj = datetime.strptime(date_str, "%Y")
211 return date_obj.strftime("%Y")
212 except ValueError:
213 return date_str
216def format_authors_apa(authors: list[dict[str, str]]) -> str | None:
217 """Format author list in APA style."""
218 if not authors:
219 return ""
221 if len(authors) == 1:
222 author = authors[0]["name"]
223 # Split on last comma for "Lastname, Firstname" format
224 parts = author.split(",", 1)
225 if len(parts) > 1:
226 return f"{parts[0].strip()}, {parts[1].strip()}"
227 return author
229 if len(authors) == _MIN_AUTHORS_FOR_ET_AL:
230 return f"{authors[0]['name']} & {authors[1]['name']}"
232 if len(authors) > _MIN_AUTHORS_FOR_ET_AL:
233 author_list = ", ".join(a["name"] for a in authors[:-1])
234 return f"{author_list}, & {authors[-1]['name']}"
235 return None
238def _build_citation_parts(
239 record_data: ZenodoRecord,
240) -> list[str]:
241 citation_parts: list[str] = []
243 authors = format_authors_apa(record_data["authors"])
244 pub_date = (
245 format_apa_date(record_data["publication_date"])
246 if record_data["publication_date"]
247 else "n.d."
248 )
249 citation_parts.append(f"{authors} ({pub_date})")
251 title = record_data["title"]
252 if record_data["type"] == "dataset":
253 title = f"{title} [Data set]"
254 elif record_data["type"] == "software":
255 title = f"{title} [Computer software]"
256 citation_parts.append(title)
258 _append_container_info(citation_parts, record_data)
260 if record_data["publisher"]:
261 citation_parts.append(record_data["publisher"])
262 if record_data["version"]:
263 citation_parts.append(f"Version {record_data['version']}")
264 if record_data["doi"]:
265 citation_parts.append(f"https://doi.org/{record_data['doi']}")
267 return citation_parts
270def _append_container_info(
271 citation_parts: list[str],
272 record_data: ZenodoRecord,
273) -> None:
274 if record_data["journal"]:
275 journal_info = [record_data["journal"]]
276 if record_data["journal_volume"]:
277 journal_info.append(f"{record_data['journal_volume']}")
278 if record_data["journal_issue"]:
279 journal_info.append(f"({record_data['journal_issue']})")
280 if record_data["journal_pages"]:
281 journal_info.append(f", {record_data['journal_pages']}")
282 citation_parts.append(", ".join(journal_info))
283 elif record_data["conference"]:
284 conf_info = [f"In {record_data['conference']}"]
285 if record_data["conference_place"]:
286 conf_info.append(f" ({record_data['conference_place']})")
287 citation_parts.append("".join(conf_info))
290def _build_extra_info(
291 record_data: ZenodoRecord,
292) -> list[str]:
293 extra_info: list[str] = []
294 if record_data["type"]:
295 extra_info.append(f"Type: {record_data['type']}")
296 if record_data["subtype"]:
297 extra_info[-1] += f" ({record_data['subtype']})"
298 if record_data["keywords"]:
299 extra_info.append(f"Keywords: {', '.join(record_data['keywords'])}")
300 if record_data["language"]:
301 extra_info.append(f"Language: {record_data['language']}")
302 if record_data["access_right"]:
303 extra_info.append(f"Access: {record_data['access_right']}")
304 return extra_info
307def format_zenodo_source(url: str) -> str:
308 """Format Zenodo source for display with full APA citation."""
310 record_id = extract_zenodo_id(url)
311 if not record_id:
312 return f'<a href="{url}" target="_blank">{url}</a>'
314 record_data = get_zenodo_data(record_id)
315 if not record_data:
316 return f'<a href="{url}" target="_blank">{url}</a>'
318 citation_parts = _build_citation_parts(record_data)
320 html = f'<a href="{url}" target="_blank" class="zenodo-attribution">'
321 html += (
322 '<img src="/static/images/zenodo-logo.png"'
323 ' alt="Zenodo" class="zenodo-icon mb-1 mx-1"'
324 ' style="width: 50px; height: 25px;'
325 ' margin-bottom: .3rem !important">'
326 )
327 html += ". ".join(citation_parts)
328 html += "</a>"
330 extra_info = _build_extra_info(record_data)
331 if extra_info:
332 html += f'<div class="text-muted small mt-1">{" | ".join(extra_info)}</div>'
334 return html