Coverage for heritrace / apis / zenodo.py: 100%
154 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-21 12:56 +0000
1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from datetime import datetime
6from functools import lru_cache
7from time import sleep
8from urllib.parse import urlparse
10import requests
11from requests.exceptions import ConnectionError, RequestException, Timeout
14class ZenodoRequestError(Exception):
15 """Custom exception for Zenodo API errors"""
17 pass
20def is_zenodo_url(url):
21 """Check if a URL is a Zenodo URL or DOI."""
22 try:
23 parsed = urlparse(url)
24 # Check for direct Zenodo URLs
25 if parsed.netloc in ["zenodo.org", "www.zenodo.org"]:
26 return True
28 # Check for DOI URLs
29 if parsed.netloc in ["doi.org", "www.doi.org"]:
30 doi_path = parsed.path.lstrip("/")
31 return doi_path.startswith("10.5281/zenodo.")
33 # Check for raw DOI strings
34 if url.startswith("10.5281/zenodo."):
35 return True
37 return False
38 except:
39 return False
42def extract_zenodo_id(url):
43 """
44 Extract Zenodo record ID from URL or DOI.
46 Args:
47 url (str): The URL or DOI to parse
49 Returns:
50 str: The Zenodo record ID or None if not found
51 """
52 try:
53 parsed = urlparse(url)
55 # Handle DOI URLs
56 if parsed.netloc in ["doi.org", "www.doi.org"]:
57 doi_path = parsed.path.lstrip("/")
58 if doi_path.startswith("10.5281/zenodo."):
59 return doi_path.split("10.5281/zenodo.")[1]
61 # Handle direct Zenodo URLs
62 elif parsed.netloc in ["zenodo.org", "www.zenodo.org"]:
63 path_parts = parsed.path.strip("/").split("/")
64 if "record" in path_parts:
65 return path_parts[path_parts.index("record") + 1]
66 elif "records" in path_parts:
67 return path_parts[path_parts.index("records") + 1]
69 # Handle raw DOI strings
70 elif url.startswith("10.5281/zenodo."):
71 return url.split("10.5281/zenodo.")[1]
73 return None
74 except:
75 return None
78def make_request_with_retry(url, headers, max_retries=3, initial_delay=1):
79 """
80 Make HTTP request with exponential backoff retry strategy.
82 Args:
83 url (str): The URL to request
84 headers (dict): Request headers
85 max_retries (int): Maximum number of retry attempts
86 initial_delay (int): Initial delay between retries in seconds
88 Returns:
89 requests.Response: The response if successful
91 Raises:
92 ZenodoRequestError: If all retries fail
93 """
94 delay = initial_delay
95 last_exception = None
97 for attempt in range(max_retries):
98 try:
99 response = requests.get(url, headers=headers, timeout=5)
101 # Check if we got rate limited
102 if response.status_code == 429:
103 retry_after = int(response.headers.get("Retry-After", delay))
104 sleep(retry_after)
105 continue
107 # If we get a 5xx error, retry
108 if 500 <= response.status_code < 600:
109 raise ZenodoRequestError(f"Server error: {response.status_code}")
111 response.raise_for_status()
112 return response
114 except (RequestException, ConnectionError, Timeout) as e:
115 last_exception = e
117 # Don't sleep after the last attempt
118 if attempt < max_retries - 1:
119 sleep(delay)
120 delay *= 2 # Exponential backoff
122 raise ZenodoRequestError(
123 f"Failed after {max_retries} attempts. Last error: {str(last_exception)}"
124 )
127@lru_cache(maxsize=1000)
128def get_zenodo_data(record_id):
129 """Fetch record data from Zenodo API with caching and retry logic."""
130 headers = {
131 "Accept": "application/json",
132 "User-Agent": "YourApp/1.0 (your@email.com)",
133 }
135 try:
136 response = make_request_with_retry(
137 f"https://zenodo.org/api/records/{record_id}", headers=headers
138 )
140 data = response.json()
141 metadata = data.get("metadata", {})
143 # Extract all possible metadata for APA citation
144 result = {
145 "title": metadata.get("title"),
146 "authors": [
147 {
148 "name": creator.get("name", ""),
149 "orcid": creator.get("orcid", ""),
150 "affiliation": creator.get("affiliation", ""),
151 }
152 for creator in metadata.get("creators", [])
153 ],
154 "doi": metadata.get("doi"),
155 "publication_date": metadata.get("publication_date"),
156 "version": metadata.get("version", ""),
157 "type": metadata.get("resource_type", {}).get("type", ""),
158 "subtype": metadata.get("resource_type", {}).get("subtype", ""),
159 "journal": metadata.get("journal", {}).get("title", ""),
160 "journal_volume": metadata.get("journal", {}).get("volume", ""),
161 "journal_issue": metadata.get("journal", {}).get("issue", ""),
162 "journal_pages": metadata.get("journal", {}).get("pages", ""),
163 "conference": metadata.get("conference", {}).get("title", ""),
164 "conference_acronym": metadata.get("conference", {}).get("acronym", ""),
165 "conference_place": metadata.get("conference", {}).get("place", ""),
166 "conference_date": metadata.get("conference", {}).get("date", ""),
167 "publisher": metadata.get("publisher", ""),
168 "keywords": metadata.get("keywords", []),
169 "description": metadata.get("description", ""),
170 "access_right": metadata.get("access_right", ""),
171 "language": metadata.get("language", ""),
172 "record_id": record_id,
173 "notes": metadata.get("notes", ""),
174 }
176 return result
178 except ZenodoRequestError:
179 return None
182def format_apa_date(date_str):
183 """Format a date in APA style (YYYY, Month DD)."""
184 try:
185 date_obj = datetime.strptime(date_str, "%Y-%m-%d")
186 return date_obj.strftime("%Y, %B %d")
187 except ValueError:
188 try:
189 date_obj = datetime.strptime(date_str, "%Y-%m")
190 return date_obj.strftime("%Y, %B")
191 except ValueError:
192 try:
193 date_obj = datetime.strptime(date_str, "%Y")
194 return date_obj.strftime("%Y")
195 except ValueError:
196 return date_str
199def format_authors_apa(authors):
200 """Format author list in APA style."""
201 if not authors:
202 return ""
204 if len(authors) == 1:
205 author = authors[0]["name"]
206 # Split on last comma for "Lastname, Firstname" format
207 parts = author.split(",", 1)
208 if len(parts) > 1:
209 return f"{parts[0].strip()}, {parts[1].strip()}"
210 return author
212 if len(authors) == 2:
213 return f"{authors[0]['name']} & {authors[1]['name']}"
215 if len(authors) > 2:
216 author_list = ", ".join(a["name"] for a in authors[:-1])
217 return f"{author_list}, & {authors[-1]['name']}"
220def format_zenodo_source(url):
221 """Format Zenodo source for display with full APA citation."""
223 record_id = extract_zenodo_id(url)
224 if not record_id:
225 return f'<a href="{url}" target="_blank">{url}</a>'
227 record_data = get_zenodo_data(record_id)
228 if not record_data:
229 return f'<a href="{url}" target="_blank">{url}</a>'
231 # Create proper link URL for DOI
232 link_url = (
233 f"https://doi.org/{record_data['doi']}"
234 if record_data["doi"]
235 else f"https://zenodo.org/record/{record_id}"
236 )
238 # Build APA citation
239 citation_parts = []
241 # Authors and Date
242 authors = format_authors_apa(record_data["authors"])
243 pub_date = (
244 format_apa_date(record_data["publication_date"])
245 if record_data["publication_date"]
246 else "n.d."
247 )
248 citation_parts.append(f"{authors} ({pub_date})")
250 # Title
251 title = record_data["title"]
252 if record_data["type"] == "dataset":
253 title = f"{title} [Data set]"
254 elif record_data["type"] == "software":
255 title = f"{title} [Computer software]"
256 citation_parts.append(title)
258 # Container info (journal/conference)
259 if record_data["journal"]:
260 journal_info = [record_data["journal"]]
261 if record_data["journal_volume"]:
262 journal_info.append(f"{record_data['journal_volume']}")
263 if record_data["journal_issue"]:
264 journal_info.append(f"({record_data['journal_issue']})")
265 if record_data["journal_pages"]:
266 journal_info.append(f", {record_data['journal_pages']}")
267 citation_parts.append(", ".join(journal_info))
268 elif record_data["conference"]:
269 conf_info = [f"In {record_data['conference']}"]
270 if record_data["conference_place"]:
271 conf_info.append(f" ({record_data['conference_place']})")
272 citation_parts.append("".join(conf_info))
274 # Publisher info
275 if record_data["publisher"]:
276 citation_parts.append(record_data["publisher"])
278 # Version info
279 if record_data["version"]:
280 citation_parts.append(f"Version {record_data['version']}")
282 # DOI
283 if record_data["doi"]:
284 citation_parts.append(f"https://doi.org/{record_data['doi']}")
286 html = f'<a href="{url}" target="_blank" class="zenodo-attribution">'
287 html += f'<img src="/static/images/zenodo-logo.png" alt="Zenodo" class="zenodo-icon mb-1 mx-1" style="width: 50px; height: 25px; margin-bottom: .3rem !important">'
288 html += ". ".join(citation_parts)
289 html += "</a>"
291 # Add additional metadata if available
292 extra_info = []
293 if record_data["type"]:
294 extra_info.append(f"Type: {record_data['type']}")
295 if record_data["subtype"]:
296 extra_info[-1] += f" ({record_data['subtype']})"
297 if record_data["keywords"]:
298 extra_info.append(f"Keywords: {', '.join(record_data['keywords'])}")
299 if record_data["language"]:
300 extra_info.append(f"Language: {record_data['language']}")
301 if record_data["access_right"]:
302 extra_info.append(f"Access: {record_data['access_right']}")
304 if extra_info:
305 html += f'<div class="text-muted small mt-1">{" | ".join(extra_info)}</div>'
307 return html