Coverage for src / piccione / upload / on_zenodo.py: 77%

164 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-02-28 16:52 +0000

1import argparse 

2import time 

3from pathlib import Path 

4 

5import requests 

6import yaml 

7from rich.progress import Progress, BarColumn, DownloadColumn, TransferSpeedColumn, TimeRemainingColumn 

8 

9 

10def get_headers(token: str, user_agent: str, content_type: str | None = None) -> dict[str, str]: 

11 headers = { 

12 "Authorization": f"Bearer {token}", 

13 "User-Agent": user_agent, 

14 } 

15 if content_type: 

16 headers["Content-Type"] = content_type 

17 return headers 

18 

19 

20class ProgressFileWrapper: 

21 def __init__(self, file_path: str, progress: Progress, task_id: int): 

22 self.file_path = file_path 

23 self.file_size = Path(file_path).stat().st_size 

24 self.fp = open(file_path, "rb") 

25 self.progress = progress 

26 self.task_id = task_id 

27 

28 def read(self, size: int = -1) -> bytes: 

29 data = self.fp.read(size) 

30 self.progress.update(self.task_id, advance=len(data)) 

31 return data 

32 

33 def __len__(self) -> int: 

34 return self.file_size 

35 

36 def close(self) -> None: 

37 self.fp.close() 

38 

39 def __enter__(self): 

40 return self 

41 

42 def __exit__(self, *args): 

43 self.close() 

44 

45 

46def upload_file_with_retry(base_url: str, record_id: str, file_path: str, token: str, user_agent: str) -> None: 

47 filename = Path(file_path).name 

48 file_size = Path(file_path).stat().st_size 

49 files_url = f"{base_url}/records/{record_id}/draft/files" 

50 

51 attempt = 0 

52 while True: 

53 attempt += 1 

54 try: 

55 print(f"\nAttempt {attempt}: {filename}") 

56 

57 response = requests.post( 

58 files_url, 

59 headers=get_headers(token, user_agent, "application/json"), 

60 json=[{"key": filename}], 

61 ) 

62 response.raise_for_status() 

63 

64 content_url = f"{files_url}/{filename}/content" 

65 with Progress( 

66 "[progress.description]{task.description}", 

67 BarColumn(), 

68 DownloadColumn(), 

69 TransferSpeedColumn(), 

70 TimeRemainingColumn(), 

71 ) as progress: 

72 task_id = progress.add_task(filename, total=file_size) 

73 with ProgressFileWrapper(file_path, progress, task_id) as wrapper: 

74 response = requests.put( 

75 content_url, 

76 data=wrapper, 

77 headers=get_headers(token, user_agent, "application/octet-stream"), 

78 timeout=(30, 3600), 

79 ) 

80 response.raise_for_status() 

81 

82 commit_url = f"{files_url}/{filename}/commit" 

83 response = requests.post(commit_url, headers=get_headers(token, user_agent)) 

84 response.raise_for_status() 

85 

86 print(f"[OK] {filename} uploaded successfully") 

87 break 

88 

89 except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: 

90 print(f"[ERROR] Network error: {e}") 

91 wait = min(2 ** (attempt - 1), 60) 

92 print(f"Retrying in {wait}s...") 

93 time.sleep(wait) 

94 except requests.exceptions.HTTPError as e: 

95 print(f"[ERROR] HTTP error: {e}") 

96 raise 

97 

98 

99def create_draft(base_url: str, token: str, user_agent: str, metadata: dict) -> dict: 

100 response = requests.post( 

101 f"{base_url}/records", 

102 headers=get_headers(token, user_agent, "application/json"), 

103 json=metadata, 

104 ) 

105 if not response.ok: 

106 print(f"Error creating draft: {response.status_code}") 

107 print(f"Response: {response.text}") 

108 response.raise_for_status() 

109 draft = response.json() 

110 print(f"Created new draft: {draft['id']}") 

111 return draft 

112 

113 

114def create_new_version(base_url: str, token: str, record_id: str, user_agent: str) -> dict: 

115 headers = get_headers(token, user_agent) 

116 response = requests.post( 

117 f"{base_url}/records/{record_id}/versions", 

118 headers=headers, 

119 ) 

120 response.raise_for_status() 

121 draft = response.json() 

122 print(f"Created new version draft: {draft['id']} (from {record_id})") 

123 return draft 

124 

125 

126def delete_draft_files(base_url: str, token: str, record_id: str, user_agent: str) -> None: 

127 headers = get_headers(token, user_agent) 

128 response = requests.get( 

129 f"{base_url}/records/{record_id}/draft/files", 

130 headers=headers, 

131 ) 

132 response.raise_for_status() 

133 files = response.json()["entries"] 

134 

135 if files: 

136 print(f"Deleting {len(files)} existing files from draft...") 

137 for f in files: 

138 filename = f["key"] 

139 delete_url = f"{base_url}/records/{record_id}/draft/files/{filename}" 

140 delete_response = requests.delete(delete_url, headers=headers) 

141 delete_response.raise_for_status() 

142 print(f" Deleted: {filename}") 

143 

144 

145def update_draft_metadata(base_url: str, token: str, record_id: str, metadata: dict, user_agent: str) -> None: 

146 response = requests.put( 

147 f"{base_url}/records/{record_id}/draft", 

148 headers=get_headers(token, user_agent, "application/json"), 

149 json=metadata, 

150 ) 

151 if not response.ok: 

152 print(f"Error updating metadata: {response.status_code}") 

153 print(f"Response: {response.text}") 

154 response.raise_for_status() 

155 print(f"Metadata updated for draft {record_id}") 

156 

157 

158def submit_community_review(base_url: str, token: str, record_id: str, community_id: str, user_agent: str) -> None: 

159 headers = get_headers(token, user_agent, "application/json") 

160 response = requests.put( 

161 f"{base_url}/records/{record_id}/draft/review", 

162 headers=headers, 

163 json={"receiver": {"community": community_id}, "type": "community-submission"}, 

164 ) 

165 response.raise_for_status() 

166 response = requests.post( 

167 f"{base_url}/records/{record_id}/draft/actions/submit-review", 

168 headers=headers, 

169 json={"body": "", "format": "html"}, 

170 ) 

171 response.raise_for_status() 

172 print(f"Submitted review for community {community_id}") 

173 

174 

175def publish_draft(base_url: str, token: str, record_id: str, user_agent: str) -> dict: 

176 response = requests.post( 

177 f"{base_url}/records/{record_id}/draft/actions/publish", 

178 headers=get_headers(token, user_agent), 

179 ) 

180 if not response.ok: 

181 print(f"Error publishing draft: {response.status_code}") 

182 print(f"Response: {response.text}") 

183 response.raise_for_status() 

184 published = response.json() 

185 print(f"Published: {published['links']['self_html']}") 

186 return published 

187 

188 

189def text_to_html(text: str) -> str: 

190 paragraphs = text.strip().split("\n\n") 

191 html_parts = [] 

192 for p in paragraphs: 

193 lines = p.strip().split("\n") 

194 if lines[0].strip().startswith("- "): 

195 items = [f"<li>{line.strip()[2:]}</li>" for line in lines if line.strip().startswith("- ")] 

196 html_parts.append(f"<ul>{''.join(items)}</ul>") 

197 else: 

198 html_parts.append(f"<p>{('<br>'.join(lines))}</p>") 

199 return "".join(html_parts) 

200 

201 

202def build_inveniordm_payload(config: dict) -> dict: 

203 metadata = {} 

204 

205 metadata["title"] = config["title"] 

206 metadata["resource_type"] = config["resource_type"] 

207 metadata["creators"] = config["creators"] 

208 metadata["publication_date"] = config["publication_date"] 

209 

210 if "description" in config: 

211 metadata["description"] = text_to_html(config["description"]) 

212 

213 if "additional_descriptions" in config: 

214 metadata["additional_descriptions"] = [ 

215 { 

216 "description": text_to_html(d["description"]), 

217 "type": d["type"], 

218 } 

219 for d in config["additional_descriptions"] 

220 ] 

221 

222 for field in ("subjects", "languages", "dates", "related_identifiers", 

223 "rights", "contributors", "funding", "references", 

224 "version", "locations", "identifiers", "publisher"): 

225 if field in config: 

226 metadata[field] = config[field] 

227 

228 return { 

229 "access": config["access"], 

230 "files": {"enabled": True}, 

231 "metadata": metadata, 

232 } 

233 

234 

235def main(config_file: str, publish: bool = False) -> dict: 

236 with open(config_file) as f: 

237 config = yaml.safe_load(f) 

238 

239 base_url = config["zenodo_url"].rstrip("/") 

240 token = config["access_token"] 

241 user_agent = config["user_agent"] 

242 record_id = config.get("record_id") 

243 community = config.get("community") 

244 

245 payload = build_inveniordm_payload(config) 

246 

247 if record_id: 

248 draft = create_new_version(base_url, token, record_id, user_agent) 

249 draft_id = draft["id"] 

250 delete_draft_files(base_url, token, draft_id, user_agent) 

251 update_draft_metadata(base_url, token, draft_id, payload, user_agent) 

252 else: 

253 draft = create_draft(base_url, token, user_agent, payload) 

254 draft_id = draft["id"] 

255 

256 print(f"Draft ID: {draft_id}") 

257 print(f"Files to upload: {len(config['files'])}") 

258 

259 for file_path in config["files"]: 

260 upload_file_with_retry(base_url, draft_id, file_path, token, user_agent) 

261 

262 if community and "sandbox" not in base_url: 

263 submit_community_review(base_url, token, draft_id, community, user_agent) 

264 

265 if publish: 

266 published = publish_draft(base_url, token, draft_id, user_agent) 

267 return published 

268 else: 

269 print(f"\nDraft ready for review: {base_url.replace('/api', '')}/uploads/{draft_id}") 

270 print("Run with --publish to publish automatically") 

271 return draft 

272 

273 

274if __name__ == "__main__": # pragma: no cover 

275 parser = argparse.ArgumentParser() 

276 parser.add_argument("config_file") 

277 parser.add_argument("--publish", action="store_true", help="Publish after upload") 

278 args = parser.parse_args() 

279 main(args.config_file, args.publish)