Coverage for heritrace / apis / zenodo.py: 99%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-07-02 10:16 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from datetime import datetime 

6from functools import lru_cache 

7from http import HTTPStatus 

8from time import sleep 

9from typing import TypedDict 

10from urllib.parse import urlparse 

11 

12import requests 

13from requests.exceptions import ConnectionError as RequestsConnectionError 

14from requests.exceptions import RequestException, Timeout 

15 

16 

17class ZenodoRecord(TypedDict): 

18 title: str 

19 authors: list[dict[str, str]] 

20 doi: str | None 

21 publication_date: str | None 

22 version: str 

23 type: str 

24 subtype: str 

25 journal: str 

26 journal_volume: str 

27 journal_issue: str 

28 journal_pages: str 

29 conference: str 

30 conference_acronym: str 

31 conference_place: str 

32 conference_date: str 

33 publisher: str 

34 keywords: list[str] 

35 description: str 

36 access_right: str 

37 language: str 

38 record_id: str 

39 notes: str 

40 

41 

42_MIN_AUTHORS_FOR_ET_AL = 2 

43 

44 

45class ZenodoRequestError(Exception): 

46 """Custom exception for Zenodo API errors""" 

47 

48 

49def is_zenodo_url(url: str) -> bool: 

50 """Check if a URL is a Zenodo URL or DOI.""" 

51 if not isinstance(url, str): 

52 return False 

53 parsed = urlparse(url) 

54 if parsed.netloc in ["zenodo.org", "www.zenodo.org"]: 

55 return True 

56 if parsed.netloc in ["doi.org", "www.doi.org"]: 

57 doi_path = parsed.path.lstrip("/") 

58 return doi_path.startswith("10.5281/zenodo.") 

59 return bool(url.startswith("10.5281/zenodo.")) 

60 

61 

62def extract_zenodo_id(url: str) -> str | None: 

63 """ 

64 Extract Zenodo record ID from URL or DOI. 

65 

66 Args: 

67 url (str): The URL or DOI to parse 

68 

69 Returns: 

70 str: The Zenodo record ID or None if not found 

71 """ 

72 if not isinstance(url, str): 

73 return None 

74 parsed = urlparse(url) 

75 if parsed.netloc in ["doi.org", "www.doi.org"]: 

76 doi_path = parsed.path.lstrip("/") 

77 if doi_path.startswith("10.5281/zenodo."): 

78 return doi_path.split("10.5281/zenodo.")[1] 

79 elif parsed.netloc in ["zenodo.org", "www.zenodo.org"]: 

80 path_parts = parsed.path.strip("/").split("/") 

81 if "record" in path_parts: 

82 return path_parts[path_parts.index("record") + 1] 

83 if "records" in path_parts: 

84 return path_parts[path_parts.index("records") + 1] 

85 elif url.startswith("10.5281/zenodo."): 

86 return url.split("10.5281/zenodo.")[1] 

87 return None 

88 

89 

90def make_request_with_retry( 

91 url: str, 

92 headers: dict[str, str], 

93 max_retries: int = 3, 

94 initial_delay: float = 1, 

95) -> requests.Response: 

96 """ 

97 Make HTTP request with exponential backoff retry strategy. 

98 

99 Args: 

100 url (str): The URL to request 

101 headers (dict): Request headers 

102 max_retries (int): Maximum number of retry attempts 

103 initial_delay (int): Initial delay between retries in seconds 

104 

105 Returns: 

106 requests.Response: The response if successful 

107 

108 Raises: 

109 ZenodoRequestError: If all retries fail 

110 """ 

111 delay = initial_delay 

112 last_exception = None 

113 

114 for attempt in range(max_retries): 

115 try: 

116 response = requests.get(url, headers=headers, timeout=5) 

117 

118 # Check if we got rate limited 

119 if response.status_code == HTTPStatus.TOO_MANY_REQUESTS: 

120 retry_after = int(response.headers.get("Retry-After", delay)) 

121 sleep(retry_after) 

122 continue 

123 

124 # If we get a 5xx error, retry 

125 if HTTPStatus.INTERNAL_SERVER_ERROR <= response.status_code < 600: # noqa: PLR2004 

126 msg = f"Server error: {response.status_code}" 

127 raise ZenodoRequestError(msg) 

128 

129 response.raise_for_status() 

130 

131 except (RequestException, RequestsConnectionError, Timeout) as e: 

132 last_exception = e 

133 

134 # Don't sleep after the last attempt 

135 if attempt < max_retries - 1: 

136 sleep(delay) 

137 delay *= 2 # Exponential backoff 

138 else: 

139 return response 

140 

141 msg = f"Failed after {max_retries} attempts. Last error: {last_exception!s}" 

142 raise ZenodoRequestError(msg) 

143 

144 

145@lru_cache(maxsize=1000) 

146def get_zenodo_data( 

147 record_id: str, 

148) -> ZenodoRecord | None: 

149 """Fetch record data from Zenodo API with caching and retry logic.""" 

150 headers = { 

151 "Accept": "application/json", 

152 "User-Agent": "YourApp/1.0 (your@email.com)", 

153 } 

154 

155 try: 

156 response = make_request_with_retry( 

157 f"https://zenodo.org/api/records/{record_id}", headers=headers 

158 ) 

159 except ZenodoRequestError: 

160 return None 

161 else: 

162 data = response.json() 

163 metadata = data.get("metadata", {}) 

164 

165 # Extract all possible metadata for APA citation 

166 return { 

167 "title": metadata.get("title"), 

168 "authors": [ 

169 { 

170 "name": creator.get("name", ""), 

171 "orcid": creator.get("orcid", ""), 

172 "affiliation": creator.get("affiliation", ""), 

173 } 

174 for creator in metadata.get("creators", []) 

175 ], 

176 "doi": metadata.get("doi"), 

177 "publication_date": metadata.get("publication_date"), 

178 "version": metadata.get("version", ""), 

179 "type": metadata.get("resource_type", {}).get("type", ""), 

180 "subtype": metadata.get("resource_type", {}).get("subtype", ""), 

181 "journal": metadata.get("journal", {}).get("title", ""), 

182 "journal_volume": metadata.get("journal", {}).get("volume", ""), 

183 "journal_issue": metadata.get("journal", {}).get("issue", ""), 

184 "journal_pages": metadata.get("journal", {}).get("pages", ""), 

185 "conference": metadata.get("conference", {}).get("title", ""), 

186 "conference_acronym": metadata.get("conference", {}).get("acronym", ""), 

187 "conference_place": metadata.get("conference", {}).get("place", ""), 

188 "conference_date": metadata.get("conference", {}).get("date", ""), 

189 "publisher": metadata.get("publisher", ""), 

190 "keywords": metadata.get("keywords", []), 

191 "description": metadata.get("description", ""), 

192 "access_right": metadata.get("access_right", ""), 

193 "language": metadata.get("language", ""), 

194 "record_id": record_id, 

195 "notes": metadata.get("notes", ""), 

196 } 

197 

198 

199def format_apa_date(date_str: str) -> str: 

200 """Format a date in APA style (YYYY, Month DD).""" 

201 try: 

202 date_obj = datetime.strptime(date_str, "%Y-%m-%d") 

203 return date_obj.strftime("%Y, %B %d") 

204 except ValueError: 

205 try: 

206 date_obj = datetime.strptime(date_str, "%Y-%m") 

207 return date_obj.strftime("%Y, %B") 

208 except ValueError: 

209 try: 

210 date_obj = datetime.strptime(date_str, "%Y") 

211 return date_obj.strftime("%Y") 

212 except ValueError: 

213 return date_str 

214 

215 

216def format_authors_apa(authors: list[dict[str, str]]) -> str | None: 

217 """Format author list in APA style.""" 

218 if not authors: 

219 return "" 

220 

221 if len(authors) == 1: 

222 author = authors[0]["name"] 

223 # Split on last comma for "Lastname, Firstname" format 

224 parts = author.split(",", 1) 

225 if len(parts) > 1: 

226 return f"{parts[0].strip()}, {parts[1].strip()}" 

227 return author 

228 

229 if len(authors) == _MIN_AUTHORS_FOR_ET_AL: 

230 return f"{authors[0]['name']} & {authors[1]['name']}" 

231 

232 if len(authors) > _MIN_AUTHORS_FOR_ET_AL: 

233 author_list = ", ".join(a["name"] for a in authors[:-1]) 

234 return f"{author_list}, & {authors[-1]['name']}" 

235 return None 

236 

237 

238def _build_citation_parts( 

239 record_data: ZenodoRecord, 

240) -> list[str]: 

241 citation_parts: list[str] = [] 

242 

243 authors = format_authors_apa(record_data["authors"]) 

244 pub_date = ( 

245 format_apa_date(record_data["publication_date"]) 

246 if record_data["publication_date"] 

247 else "n.d." 

248 ) 

249 citation_parts.append(f"{authors} ({pub_date})") 

250 

251 title = record_data["title"] 

252 if record_data["type"] == "dataset": 

253 title = f"{title} [Data set]" 

254 elif record_data["type"] == "software": 

255 title = f"{title} [Computer software]" 

256 citation_parts.append(title) 

257 

258 _append_container_info(citation_parts, record_data) 

259 

260 if record_data["publisher"]: 

261 citation_parts.append(record_data["publisher"]) 

262 if record_data["version"]: 

263 citation_parts.append(f"Version {record_data['version']}") 

264 if record_data["doi"]: 

265 citation_parts.append(f"https://doi.org/{record_data['doi']}") 

266 

267 return citation_parts 

268 

269 

270def _append_container_info( 

271 citation_parts: list[str], 

272 record_data: ZenodoRecord, 

273) -> None: 

274 if record_data["journal"]: 

275 journal_info = [record_data["journal"]] 

276 if record_data["journal_volume"]: 

277 journal_info.append(f"{record_data['journal_volume']}") 

278 if record_data["journal_issue"]: 

279 journal_info.append(f"({record_data['journal_issue']})") 

280 if record_data["journal_pages"]: 

281 journal_info.append(f", {record_data['journal_pages']}") 

282 citation_parts.append(", ".join(journal_info)) 

283 elif record_data["conference"]: 

284 conf_info = [f"In {record_data['conference']}"] 

285 if record_data["conference_place"]: 

286 conf_info.append(f" ({record_data['conference_place']})") 

287 citation_parts.append("".join(conf_info)) 

288 

289 

290def _build_extra_info( 

291 record_data: ZenodoRecord, 

292) -> list[str]: 

293 extra_info: list[str] = [] 

294 if record_data["type"]: 

295 extra_info.append(f"Type: {record_data['type']}") 

296 if record_data["subtype"]: 

297 extra_info[-1] += f" ({record_data['subtype']})" 

298 if record_data["keywords"]: 

299 extra_info.append(f"Keywords: {', '.join(record_data['keywords'])}") 

300 if record_data["language"]: 

301 extra_info.append(f"Language: {record_data['language']}") 

302 if record_data["access_right"]: 

303 extra_info.append(f"Access: {record_data['access_right']}") 

304 return extra_info 

305 

306 

307def format_zenodo_source(url: str) -> str: 

308 """Format Zenodo source for display with full APA citation.""" 

309 

310 record_id = extract_zenodo_id(url) 

311 if not record_id: 

312 return f'<a href="{url}" target="_blank">{url}</a>' 

313 

314 record_data = get_zenodo_data(record_id) 

315 if not record_data: 

316 return f'<a href="{url}" target="_blank">{url}</a>' 

317 

318 citation_parts = _build_citation_parts(record_data) 

319 

320 html = f'<a href="{url}" target="_blank" class="zenodo-attribution">' 

321 html += ( 

322 '<img src="/static/images/zenodo-logo.png"' 

323 ' alt="Zenodo" class="zenodo-icon mb-1 mx-1"' 

324 ' style="width: 50px; height: 25px;' 

325 ' margin-bottom: .3rem !important">' 

326 ) 

327 html += ". ".join(citation_parts) 

328 html += "</a>" 

329 

330 extra_info = _build_extra_info(record_data) 

331 if extra_info: 

332 html += f'<div class="text-muted small mt-1">{" | ".join(extra_info)}</div>' 

333 

334 return html