Coverage for heritrace/apis/zenodo.py: 100%

154 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-18 11:10 +0000

1from datetime import datetime 

2from functools import lru_cache 

3from time import sleep 

4from urllib.parse import urlparse 

5 

6import requests 

7from requests.exceptions import ConnectionError, RequestException, Timeout 

8 

9 

10class ZenodoRequestError(Exception): 

11 """Custom exception for Zenodo API errors""" 

12 

13 pass 

14 

15 

16def is_zenodo_url(url): 

17 """Check if a URL is a Zenodo URL or DOI.""" 

18 try: 

19 parsed = urlparse(url) 

20 # Check for direct Zenodo URLs 

21 if parsed.netloc in ["zenodo.org", "www.zenodo.org"]: 

22 return True 

23 

24 # Check for DOI URLs 

25 if parsed.netloc in ["doi.org", "www.doi.org"]: 

26 doi_path = parsed.path.lstrip("/") 

27 return doi_path.startswith("10.5281/zenodo.") 

28 

29 # Check for raw DOI strings 

30 if url.startswith("10.5281/zenodo."): 

31 return True 

32 

33 return False 

34 except: 

35 return False 

36 

37 

38def extract_zenodo_id(url): 

39 """ 

40 Extract Zenodo record ID from URL or DOI. 

41 

42 Args: 

43 url (str): The URL or DOI to parse 

44 

45 Returns: 

46 str: The Zenodo record ID or None if not found 

47 """ 

48 try: 

49 parsed = urlparse(url) 

50 

51 # Handle DOI URLs 

52 if parsed.netloc in ["doi.org", "www.doi.org"]: 

53 doi_path = parsed.path.lstrip("/") 

54 if doi_path.startswith("10.5281/zenodo."): 

55 return doi_path.split("10.5281/zenodo.")[1] 

56 

57 # Handle direct Zenodo URLs 

58 elif parsed.netloc in ["zenodo.org", "www.zenodo.org"]: 

59 path_parts = parsed.path.strip("/").split("/") 

60 if "record" in path_parts: 

61 return path_parts[path_parts.index("record") + 1] 

62 elif "records" in path_parts: 

63 return path_parts[path_parts.index("records") + 1] 

64 

65 # Handle raw DOI strings 

66 elif url.startswith("10.5281/zenodo."): 

67 return url.split("10.5281/zenodo.")[1] 

68 

69 return None 

70 except: 

71 return None 

72 

73 

74def make_request_with_retry(url, headers, max_retries=3, initial_delay=1): 

75 """ 

76 Make HTTP request with exponential backoff retry strategy. 

77 

78 Args: 

79 url (str): The URL to request 

80 headers (dict): Request headers 

81 max_retries (int): Maximum number of retry attempts 

82 initial_delay (int): Initial delay between retries in seconds 

83 

84 Returns: 

85 requests.Response: The response if successful 

86 

87 Raises: 

88 ZenodoRequestError: If all retries fail 

89 """ 

90 delay = initial_delay 

91 last_exception = None 

92 

93 for attempt in range(max_retries): 

94 try: 

95 response = requests.get(url, headers=headers, timeout=5) 

96 

97 # Check if we got rate limited 

98 if response.status_code == 429: 

99 retry_after = int(response.headers.get("Retry-After", delay)) 

100 sleep(retry_after) 

101 continue 

102 

103 # If we get a 5xx error, retry 

104 if 500 <= response.status_code < 600: 

105 raise ZenodoRequestError(f"Server error: {response.status_code}") 

106 

107 response.raise_for_status() 

108 return response 

109 

110 except (RequestException, ConnectionError, Timeout) as e: 

111 last_exception = e 

112 

113 # Don't sleep after the last attempt 

114 if attempt < max_retries - 1: 

115 sleep(delay) 

116 delay *= 2 # Exponential backoff 

117 

118 raise ZenodoRequestError( 

119 f"Failed after {max_retries} attempts. Last error: {str(last_exception)}" 

120 ) 

121 

122 

123@lru_cache(maxsize=1000) 

124def get_zenodo_data(record_id): 

125 """Fetch record data from Zenodo API with caching and retry logic.""" 

126 headers = { 

127 "Accept": "application/json", 

128 "User-Agent": "YourApp/1.0 (your@email.com)", 

129 } 

130 

131 try: 

132 response = make_request_with_retry( 

133 f"https://zenodo.org/api/records/{record_id}", headers=headers 

134 ) 

135 

136 data = response.json() 

137 metadata = data.get("metadata", {}) 

138 

139 # Extract all possible metadata for APA citation 

140 result = { 

141 "title": metadata.get("title"), 

142 "authors": [ 

143 { 

144 "name": creator.get("name", ""), 

145 "orcid": creator.get("orcid", ""), 

146 "affiliation": creator.get("affiliation", ""), 

147 } 

148 for creator in metadata.get("creators", []) 

149 ], 

150 "doi": metadata.get("doi"), 

151 "publication_date": metadata.get("publication_date"), 

152 "version": metadata.get("version", ""), 

153 "type": metadata.get("resource_type", {}).get("type", ""), 

154 "subtype": metadata.get("resource_type", {}).get("subtype", ""), 

155 "journal": metadata.get("journal", {}).get("title", ""), 

156 "journal_volume": metadata.get("journal", {}).get("volume", ""), 

157 "journal_issue": metadata.get("journal", {}).get("issue", ""), 

158 "journal_pages": metadata.get("journal", {}).get("pages", ""), 

159 "conference": metadata.get("conference", {}).get("title", ""), 

160 "conference_acronym": metadata.get("conference", {}).get("acronym", ""), 

161 "conference_place": metadata.get("conference", {}).get("place", ""), 

162 "conference_date": metadata.get("conference", {}).get("date", ""), 

163 "publisher": metadata.get("publisher", ""), 

164 "keywords": metadata.get("keywords", []), 

165 "description": metadata.get("description", ""), 

166 "access_right": metadata.get("access_right", ""), 

167 "language": metadata.get("language", ""), 

168 "record_id": record_id, 

169 "notes": metadata.get("notes", ""), 

170 } 

171 

172 return result 

173 

174 except ZenodoRequestError: 

175 return None 

176 

177 

178def format_apa_date(date_str): 

179 """Format a date in APA style (YYYY, Month DD).""" 

180 try: 

181 date_obj = datetime.strptime(date_str, "%Y-%m-%d") 

182 return date_obj.strftime("%Y, %B %d") 

183 except ValueError: 

184 try: 

185 date_obj = datetime.strptime(date_str, "%Y-%m") 

186 return date_obj.strftime("%Y, %B") 

187 except ValueError: 

188 try: 

189 date_obj = datetime.strptime(date_str, "%Y") 

190 return date_obj.strftime("%Y") 

191 except ValueError: 

192 return date_str 

193 

194 

195def format_authors_apa(authors): 

196 """Format author list in APA style.""" 

197 if not authors: 

198 return "" 

199 

200 if len(authors) == 1: 

201 author = authors[0]["name"] 

202 # Split on last comma for "Lastname, Firstname" format 

203 parts = author.split(",", 1) 

204 if len(parts) > 1: 

205 return f"{parts[0].strip()}, {parts[1].strip()}" 

206 return author 

207 

208 if len(authors) == 2: 

209 return f"{authors[0]['name']} & {authors[1]['name']}" 

210 

211 if len(authors) > 2: 

212 author_list = ", ".join(a["name"] for a in authors[:-1]) 

213 return f"{author_list}, & {authors[-1]['name']}" 

214 

215 

216def format_zenodo_source(url): 

217 """Format Zenodo source for display with full APA citation.""" 

218 

219 record_id = extract_zenodo_id(url) 

220 if not record_id: 

221 return f'<a href="{url}" target="_blank">{url}</a>' 

222 

223 record_data = get_zenodo_data(record_id) 

224 if not record_data: 

225 return f'<a href="{url}" target="_blank">{url}</a>' 

226 

227 # Create proper link URL for DOI 

228 link_url = ( 

229 f"https://doi.org/{record_data['doi']}" 

230 if record_data["doi"] 

231 else f"https://zenodo.org/record/{record_id}" 

232 ) 

233 

234 # Build APA citation 

235 citation_parts = [] 

236 

237 # Authors and Date 

238 authors = format_authors_apa(record_data["authors"]) 

239 pub_date = ( 

240 format_apa_date(record_data["publication_date"]) 

241 if record_data["publication_date"] 

242 else "n.d." 

243 ) 

244 citation_parts.append(f"{authors} ({pub_date})") 

245 

246 # Title 

247 title = record_data["title"] 

248 if record_data["type"] == "dataset": 

249 title = f"{title} [Data set]" 

250 elif record_data["type"] == "software": 

251 title = f"{title} [Computer software]" 

252 citation_parts.append(title) 

253 

254 # Container info (journal/conference) 

255 if record_data["journal"]: 

256 journal_info = [record_data["journal"]] 

257 if record_data["journal_volume"]: 

258 journal_info.append(f"{record_data['journal_volume']}") 

259 if record_data["journal_issue"]: 

260 journal_info.append(f"({record_data['journal_issue']})") 

261 if record_data["journal_pages"]: 

262 journal_info.append(f", {record_data['journal_pages']}") 

263 citation_parts.append(", ".join(journal_info)) 

264 elif record_data["conference"]: 

265 conf_info = [f"In {record_data['conference']}"] 

266 if record_data["conference_place"]: 

267 conf_info.append(f" ({record_data['conference_place']})") 

268 citation_parts.append("".join(conf_info)) 

269 

270 # Publisher info 

271 if record_data["publisher"]: 

272 citation_parts.append(record_data["publisher"]) 

273 

274 # Version info 

275 if record_data["version"]: 

276 citation_parts.append(f"Version {record_data['version']}") 

277 

278 # DOI 

279 if record_data["doi"]: 

280 citation_parts.append(f"https://doi.org/{record_data['doi']}") 

281 

282 html = f'<a href="{url}" target="_blank" class="zenodo-attribution">' 

283 html += f'<img src="/static/images/zenodo-logo.png" alt="Zenodo" class="zenodo-icon mb-1 mx-1" style="width: 50px; height: 25px; margin-bottom: .3rem !important">' 

284 html += ". ".join(citation_parts) 

285 html += "</a>" 

286 

287 # Add additional metadata if available 

288 extra_info = [] 

289 if record_data["type"]: 

290 extra_info.append(f"Type: {record_data['type']}") 

291 if record_data["subtype"]: 

292 extra_info[-1] += f" ({record_data['subtype']})" 

293 if record_data["keywords"]: 

294 extra_info.append(f"Keywords: {', '.join(record_data['keywords'])}") 

295 if record_data["language"]: 

296 extra_info.append(f"Language: {record_data['language']}") 

297 if record_data["access_right"]: 

298 extra_info.append(f"Access: {record_data['access_right']}") 

299 

300 if extra_info: 

301 html += f'<div class="text-muted small mt-1">{" | ".join(extra_info)}</div>' 

302 

303 return html