Coverage for heritrace / apis / zenodo.py: 100%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-21 12:56 +0000

1# SPDX-FileCopyrightText: 2024-2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from datetime import datetime 

6from functools import lru_cache 

7from time import sleep 

8from urllib.parse import urlparse 

9 

10import requests 

11from requests.exceptions import ConnectionError, RequestException, Timeout 

12 

13 

14class ZenodoRequestError(Exception): 

15 """Custom exception for Zenodo API errors""" 

16 

17 pass 

18 

19 

20def is_zenodo_url(url): 

21 """Check if a URL is a Zenodo URL or DOI.""" 

22 try: 

23 parsed = urlparse(url) 

24 # Check for direct Zenodo URLs 

25 if parsed.netloc in ["zenodo.org", "www.zenodo.org"]: 

26 return True 

27 

28 # Check for DOI URLs 

29 if parsed.netloc in ["doi.org", "www.doi.org"]: 

30 doi_path = parsed.path.lstrip("/") 

31 return doi_path.startswith("10.5281/zenodo.") 

32 

33 # Check for raw DOI strings 

34 if url.startswith("10.5281/zenodo."): 

35 return True 

36 

37 return False 

38 except: 

39 return False 

40 

41 

42def extract_zenodo_id(url): 

43 """ 

44 Extract Zenodo record ID from URL or DOI. 

45 

46 Args: 

47 url (str): The URL or DOI to parse 

48 

49 Returns: 

50 str: The Zenodo record ID or None if not found 

51 """ 

52 try: 

53 parsed = urlparse(url) 

54 

55 # Handle DOI URLs 

56 if parsed.netloc in ["doi.org", "www.doi.org"]: 

57 doi_path = parsed.path.lstrip("/") 

58 if doi_path.startswith("10.5281/zenodo."): 

59 return doi_path.split("10.5281/zenodo.")[1] 

60 

61 # Handle direct Zenodo URLs 

62 elif parsed.netloc in ["zenodo.org", "www.zenodo.org"]: 

63 path_parts = parsed.path.strip("/").split("/") 

64 if "record" in path_parts: 

65 return path_parts[path_parts.index("record") + 1] 

66 elif "records" in path_parts: 

67 return path_parts[path_parts.index("records") + 1] 

68 

69 # Handle raw DOI strings 

70 elif url.startswith("10.5281/zenodo."): 

71 return url.split("10.5281/zenodo.")[1] 

72 

73 return None 

74 except: 

75 return None 

76 

77 

78def make_request_with_retry(url, headers, max_retries=3, initial_delay=1): 

79 """ 

80 Make HTTP request with exponential backoff retry strategy. 

81 

82 Args: 

83 url (str): The URL to request 

84 headers (dict): Request headers 

85 max_retries (int): Maximum number of retry attempts 

86 initial_delay (int): Initial delay between retries in seconds 

87 

88 Returns: 

89 requests.Response: The response if successful 

90 

91 Raises: 

92 ZenodoRequestError: If all retries fail 

93 """ 

94 delay = initial_delay 

95 last_exception = None 

96 

97 for attempt in range(max_retries): 

98 try: 

99 response = requests.get(url, headers=headers, timeout=5) 

100 

101 # Check if we got rate limited 

102 if response.status_code == 429: 

103 retry_after = int(response.headers.get("Retry-After", delay)) 

104 sleep(retry_after) 

105 continue 

106 

107 # If we get a 5xx error, retry 

108 if 500 <= response.status_code < 600: 

109 raise ZenodoRequestError(f"Server error: {response.status_code}") 

110 

111 response.raise_for_status() 

112 return response 

113 

114 except (RequestException, ConnectionError, Timeout) as e: 

115 last_exception = e 

116 

117 # Don't sleep after the last attempt 

118 if attempt < max_retries - 1: 

119 sleep(delay) 

120 delay *= 2 # Exponential backoff 

121 

122 raise ZenodoRequestError( 

123 f"Failed after {max_retries} attempts. Last error: {str(last_exception)}" 

124 ) 

125 

126 

127@lru_cache(maxsize=1000) 

128def get_zenodo_data(record_id): 

129 """Fetch record data from Zenodo API with caching and retry logic.""" 

130 headers = { 

131 "Accept": "application/json", 

132 "User-Agent": "YourApp/1.0 (your@email.com)", 

133 } 

134 

135 try: 

136 response = make_request_with_retry( 

137 f"https://zenodo.org/api/records/{record_id}", headers=headers 

138 ) 

139 

140 data = response.json() 

141 metadata = data.get("metadata", {}) 

142 

143 # Extract all possible metadata for APA citation 

144 result = { 

145 "title": metadata.get("title"), 

146 "authors": [ 

147 { 

148 "name": creator.get("name", ""), 

149 "orcid": creator.get("orcid", ""), 

150 "affiliation": creator.get("affiliation", ""), 

151 } 

152 for creator in metadata.get("creators", []) 

153 ], 

154 "doi": metadata.get("doi"), 

155 "publication_date": metadata.get("publication_date"), 

156 "version": metadata.get("version", ""), 

157 "type": metadata.get("resource_type", {}).get("type", ""), 

158 "subtype": metadata.get("resource_type", {}).get("subtype", ""), 

159 "journal": metadata.get("journal", {}).get("title", ""), 

160 "journal_volume": metadata.get("journal", {}).get("volume", ""), 

161 "journal_issue": metadata.get("journal", {}).get("issue", ""), 

162 "journal_pages": metadata.get("journal", {}).get("pages", ""), 

163 "conference": metadata.get("conference", {}).get("title", ""), 

164 "conference_acronym": metadata.get("conference", {}).get("acronym", ""), 

165 "conference_place": metadata.get("conference", {}).get("place", ""), 

166 "conference_date": metadata.get("conference", {}).get("date", ""), 

167 "publisher": metadata.get("publisher", ""), 

168 "keywords": metadata.get("keywords", []), 

169 "description": metadata.get("description", ""), 

170 "access_right": metadata.get("access_right", ""), 

171 "language": metadata.get("language", ""), 

172 "record_id": record_id, 

173 "notes": metadata.get("notes", ""), 

174 } 

175 

176 return result 

177 

178 except ZenodoRequestError: 

179 return None 

180 

181 

182def format_apa_date(date_str): 

183 """Format a date in APA style (YYYY, Month DD).""" 

184 try: 

185 date_obj = datetime.strptime(date_str, "%Y-%m-%d") 

186 return date_obj.strftime("%Y, %B %d") 

187 except ValueError: 

188 try: 

189 date_obj = datetime.strptime(date_str, "%Y-%m") 

190 return date_obj.strftime("%Y, %B") 

191 except ValueError: 

192 try: 

193 date_obj = datetime.strptime(date_str, "%Y") 

194 return date_obj.strftime("%Y") 

195 except ValueError: 

196 return date_str 

197 

198 

199def format_authors_apa(authors): 

200 """Format author list in APA style.""" 

201 if not authors: 

202 return "" 

203 

204 if len(authors) == 1: 

205 author = authors[0]["name"] 

206 # Split on last comma for "Lastname, Firstname" format 

207 parts = author.split(",", 1) 

208 if len(parts) > 1: 

209 return f"{parts[0].strip()}, {parts[1].strip()}" 

210 return author 

211 

212 if len(authors) == 2: 

213 return f"{authors[0]['name']} & {authors[1]['name']}" 

214 

215 if len(authors) > 2: 

216 author_list = ", ".join(a["name"] for a in authors[:-1]) 

217 return f"{author_list}, & {authors[-1]['name']}" 

218 

219 

220def format_zenodo_source(url): 

221 """Format Zenodo source for display with full APA citation.""" 

222 

223 record_id = extract_zenodo_id(url) 

224 if not record_id: 

225 return f'<a href="{url}" target="_blank">{url}</a>' 

226 

227 record_data = get_zenodo_data(record_id) 

228 if not record_data: 

229 return f'<a href="{url}" target="_blank">{url}</a>' 

230 

231 # Create proper link URL for DOI 

232 link_url = ( 

233 f"https://doi.org/{record_data['doi']}" 

234 if record_data["doi"] 

235 else f"https://zenodo.org/record/{record_id}" 

236 ) 

237 

238 # Build APA citation 

239 citation_parts = [] 

240 

241 # Authors and Date 

242 authors = format_authors_apa(record_data["authors"]) 

243 pub_date = ( 

244 format_apa_date(record_data["publication_date"]) 

245 if record_data["publication_date"] 

246 else "n.d." 

247 ) 

248 citation_parts.append(f"{authors} ({pub_date})") 

249 

250 # Title 

251 title = record_data["title"] 

252 if record_data["type"] == "dataset": 

253 title = f"{title} [Data set]" 

254 elif record_data["type"] == "software": 

255 title = f"{title} [Computer software]" 

256 citation_parts.append(title) 

257 

258 # Container info (journal/conference) 

259 if record_data["journal"]: 

260 journal_info = [record_data["journal"]] 

261 if record_data["journal_volume"]: 

262 journal_info.append(f"{record_data['journal_volume']}") 

263 if record_data["journal_issue"]: 

264 journal_info.append(f"({record_data['journal_issue']})") 

265 if record_data["journal_pages"]: 

266 journal_info.append(f", {record_data['journal_pages']}") 

267 citation_parts.append(", ".join(journal_info)) 

268 elif record_data["conference"]: 

269 conf_info = [f"In {record_data['conference']}"] 

270 if record_data["conference_place"]: 

271 conf_info.append(f" ({record_data['conference_place']})") 

272 citation_parts.append("".join(conf_info)) 

273 

274 # Publisher info 

275 if record_data["publisher"]: 

276 citation_parts.append(record_data["publisher"]) 

277 

278 # Version info 

279 if record_data["version"]: 

280 citation_parts.append(f"Version {record_data['version']}") 

281 

282 # DOI 

283 if record_data["doi"]: 

284 citation_parts.append(f"https://doi.org/{record_data['doi']}") 

285 

286 html = f'<a href="{url}" target="_blank" class="zenodo-attribution">' 

287 html += f'<img src="/static/images/zenodo-logo.png" alt="Zenodo" class="zenodo-icon mb-1 mx-1" style="width: 50px; height: 25px; margin-bottom: .3rem !important">' 

288 html += ". ".join(citation_parts) 

289 html += "</a>" 

290 

291 # Add additional metadata if available 

292 extra_info = [] 

293 if record_data["type"]: 

294 extra_info.append(f"Type: {record_data['type']}") 

295 if record_data["subtype"]: 

296 extra_info[-1] += f" ({record_data['subtype']})" 

297 if record_data["keywords"]: 

298 extra_info.append(f"Keywords: {', '.join(record_data['keywords'])}") 

299 if record_data["language"]: 

300 extra_info.append(f"Language: {record_data['language']}") 

301 if record_data["access_right"]: 

302 extra_info.append(f"Access: {record_data['access_right']}") 

303 

304 if extra_info: 

305 html += f'<div class="text-muted small mt-1">{" | ".join(extra_info)}</div>' 

306 

307 return html