Coverage for src / piccione / upload / on_zenodo.py: 76%

170 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-03-21 11:49 +0000

1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import time 

7from pathlib import Path 

8 

9import requests 

10import yaml 

11from rich.progress import Progress, BarColumn, DownloadColumn, TransferSpeedColumn, TimeRemainingColumn 

12 

13 

14def get_headers(token: str, user_agent: str, content_type: str | None = None) -> dict[str, str]: 

15 headers = { 

16 "Authorization": f"Bearer {token}", 

17 "User-Agent": user_agent, 

18 } 

19 if content_type: 

20 headers["Content-Type"] = content_type 

21 return headers 

22 

23 

24class ProgressFileWrapper: 

25 def __init__(self, file_path: str, progress: Progress, task_id: int): 

26 self.file_path = file_path 

27 self.file_size = Path(file_path).stat().st_size 

28 self.fp = open(file_path, "rb") 

29 self.progress = progress 

30 self.task_id = task_id 

31 

32 def read(self, size: int = -1) -> bytes: 

33 data = self.fp.read(size) 

34 self.progress.update(self.task_id, advance=len(data)) 

35 return data 

36 

37 def __len__(self) -> int: 

38 return self.file_size 

39 

40 def close(self) -> None: 

41 self.fp.close() 

42 

43 def __enter__(self): 

44 return self 

45 

46 def __exit__(self, *args): 

47 self.close() 

48 

49 

50def upload_file_with_retry(base_url: str, record_id: str, file_path: str, token: str, user_agent: str) -> None: 

51 filename = Path(file_path).name 

52 file_size = Path(file_path).stat().st_size 

53 files_url = f"{base_url}/records/{record_id}/draft/files" 

54 

55 attempt = 0 

56 while True: 

57 attempt += 1 

58 try: 

59 print(f"\nAttempt {attempt}: {filename}") 

60 

61 response = requests.post( 

62 files_url, 

63 headers=get_headers(token, user_agent, "application/json"), 

64 json=[{"key": filename}], 

65 ) 

66 response.raise_for_status() 

67 

68 content_url = f"{files_url}/{filename}/content" 

69 with Progress( 

70 "[progress.description]{task.description}", 

71 BarColumn(), 

72 DownloadColumn(), 

73 TransferSpeedColumn(), 

74 TimeRemainingColumn(), 

75 ) as progress: 

76 task_id = progress.add_task(filename, total=file_size) 

77 with ProgressFileWrapper(file_path, progress, task_id) as wrapper: 

78 response = requests.put( 

79 content_url, 

80 data=wrapper, 

81 headers=get_headers(token, user_agent, "application/octet-stream"), 

82 timeout=(30, 3600), 

83 ) 

84 response.raise_for_status() 

85 

86 commit_url = f"{files_url}/{filename}/commit" 

87 response = requests.post(commit_url, headers=get_headers(token, user_agent)) 

88 response.raise_for_status() 

89 

90 print(f"[OK] {filename} uploaded successfully") 

91 break 

92 

93 except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: 

94 print(f"[ERROR] Network error: {e}") 

95 wait = min(2 ** (attempt - 1), 60) 

96 print(f"Retrying in {wait}s...") 

97 time.sleep(wait) 

98 except requests.exceptions.HTTPError as e: 

99 print(f"[ERROR] HTTP error: {e}") 

100 raise 

101 

102 

103def create_draft(base_url: str, token: str, user_agent: str, metadata: dict) -> dict: 

104 response = requests.post( 

105 f"{base_url}/records", 

106 headers=get_headers(token, user_agent, "application/json"), 

107 json=metadata, 

108 ) 

109 if not response.ok: 

110 print(f"Error creating draft: {response.status_code}") 

111 print(f"Response: {response.text}") 

112 response.raise_for_status() 

113 draft = response.json() 

114 print(f"Created new draft: {draft['id']}") 

115 return draft 

116 

117 

118def create_new_version(base_url: str, token: str, record_id: str, user_agent: str) -> dict: 

119 headers = get_headers(token, user_agent) 

120 response = requests.post( 

121 f"{base_url}/records/{record_id}/versions", 

122 headers=headers, 

123 ) 

124 response.raise_for_status() 

125 draft = response.json() 

126 print(f"Created new version draft: {draft['id']} (from {record_id})") 

127 return draft 

128 

129 

130def delete_draft_files(base_url: str, token: str, record_id: str, user_agent: str) -> None: 

131 headers = get_headers(token, user_agent) 

132 response = requests.get( 

133 f"{base_url}/records/{record_id}/draft/files", 

134 headers=headers, 

135 ) 

136 response.raise_for_status() 

137 files = response.json()["entries"] 

138 

139 if files: 

140 print(f"Deleting {len(files)} existing files from draft...") 

141 for f in files: 

142 filename = f["key"] 

143 delete_url = f"{base_url}/records/{record_id}/draft/files/{filename}" 

144 delete_response = requests.delete(delete_url, headers=headers) 

145 delete_response.raise_for_status() 

146 print(f" Deleted: {filename}") 

147 

148 

149def update_draft_metadata(base_url: str, token: str, record_id: str, metadata: dict, user_agent: str) -> None: 

150 response = requests.put( 

151 f"{base_url}/records/{record_id}/draft", 

152 headers=get_headers(token, user_agent, "application/json"), 

153 json=metadata, 

154 ) 

155 if not response.ok: 

156 print(f"Error updating metadata: {response.status_code}") 

157 print(f"Response: {response.text}") 

158 response.raise_for_status() 

159 print(f"Metadata updated for draft {record_id}") 

160 

161 

162def submit_community_review(base_url: str, token: str, record_id: str, community_id: str, user_agent: str) -> None: 

163 headers = get_headers(token, user_agent, "application/json") 

164 response = requests.put( 

165 f"{base_url}/records/{record_id}/draft/review", 

166 headers=headers, 

167 json={"receiver": {"community": community_id}, "type": "community-submission"}, 

168 ) 

169 if not response.ok: 

170 print(f"Error submitting community review: {response.status_code}") 

171 print(f"Response: {response.text}") 

172 response.raise_for_status() 

173 response = requests.post( 

174 f"{base_url}/records/{record_id}/draft/actions/submit-review", 

175 headers=headers, 

176 json={"body": "", "format": "html"}, 

177 ) 

178 response.raise_for_status() 

179 print(f"Submitted review for community {community_id}") 

180 

181 

182def publish_draft(base_url: str, token: str, record_id: str, user_agent: str) -> dict: 

183 response = requests.post( 

184 f"{base_url}/records/{record_id}/draft/actions/publish", 

185 headers=get_headers(token, user_agent), 

186 ) 

187 if not response.ok: 

188 print(f"Error publishing draft: {response.status_code}") 

189 print(f"Response: {response.text}") 

190 response.raise_for_status() 

191 published = response.json() 

192 print(f"Published: {published['links']['self_html']}") 

193 return published 

194 

195 

196def text_to_html(text: str) -> str: 

197 paragraphs = text.strip().split("\n\n") 

198 html_parts = [] 

199 for p in paragraphs: 

200 lines = p.strip().split("\n") 

201 if lines[0].strip().startswith("- "): 

202 items = [f"<li>{line.strip()[2:]}</li>" for line in lines if line.strip().startswith("- ")] 

203 html_parts.append(f"<ul>{''.join(items)}</ul>") 

204 else: 

205 html_parts.append(f"<p>{('<br>'.join(lines))}</p>") 

206 return "".join(html_parts) 

207 

208 

209def build_inveniordm_payload(config: dict) -> dict: 

210 metadata = {} 

211 

212 metadata["title"] = config["title"] 

213 metadata["resource_type"] = config["resource_type"] 

214 metadata["creators"] = config["creators"] 

215 metadata["publication_date"] = config["publication_date"] 

216 

217 if "description" in config: 

218 metadata["description"] = text_to_html(config["description"]) 

219 

220 if "additional_descriptions" in config: 

221 metadata["additional_descriptions"] = [ 

222 {"description": text_to_html(d["description"]), "type": d["type"]} 

223 for d in config["additional_descriptions"] 

224 ] 

225 

226 for field in ("subjects", "languages", "dates", "related_identifiers", 

227 "rights", "contributors", "funding", 

228 "version", "locations", "identifiers"): 

229 if field in config: 

230 metadata[field] = config[field] 

231 

232 metadata["publisher"] = config.get("publisher", "Zenodo") 

233 

234 if "references" in config: 

235 metadata["references"] = [ 

236 {"reference": ref} if isinstance(ref, str) else ref 

237 for ref in config["references"] 

238 ] 

239 

240 return { 

241 "access": config["access"], 

242 "files": {"enabled": True}, 

243 "metadata": metadata, 

244 } 

245 

246 

247def main(config_file: str, publish: bool = False) -> dict: 

248 with open(config_file) as f: 

249 config = yaml.safe_load(f) 

250 

251 base_url = config["zenodo_url"].rstrip("/") 

252 token = config["access_token"] 

253 user_agent = config["user_agent"] 

254 record_id = config.get("record_id") 

255 community = config.get("community") 

256 

257 payload = build_inveniordm_payload(config) 

258 

259 if record_id: 

260 draft = create_new_version(base_url, token, record_id, user_agent) 

261 draft_id = draft["id"] 

262 delete_draft_files(base_url, token, draft_id, user_agent) 

263 update_draft_metadata(base_url, token, draft_id, payload, user_agent) 

264 else: 

265 draft = create_draft(base_url, token, user_agent, payload) 

266 draft_id = draft["id"] 

267 

268 print(f"Draft ID: {draft_id}") 

269 print(f"Files to upload: {len(config['files'])}") 

270 

271 for file_path in config["files"]: 

272 upload_file_with_retry(base_url, draft_id, file_path, token, user_agent) 

273 

274 if community and "sandbox" not in base_url and not record_id: 

275 submit_community_review(base_url, token, draft_id, community, user_agent) 

276 

277 if publish: 

278 published = publish_draft(base_url, token, draft_id, user_agent) 

279 return published 

280 else: 

281 print(f"\nDraft ready for review: {base_url.replace('/api', '')}/uploads/{draft_id}") 

282 print("Run with --publish to publish automatically") 

283 return draft 

284 

285 

286if __name__ == "__main__": # pragma: no cover 

287 parser = argparse.ArgumentParser() 

288 parser.add_argument("config_file") 

289 parser.add_argument("--publish", action="store_true", help="Publish after upload") 

290 args = parser.parse_args() 

291 main(args.config_file, args.publish)