Coverage for src / piccione / upload / on_zenodo.py: 78%

223 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-27 20:21 +0000

1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import argparse 

8import time 

9from pathlib import Path 

10from typing import TYPE_CHECKING, TypedDict, cast 

11 

12if TYPE_CHECKING: 

13 from typing import Self 

14 

15import requests 

16import yaml 

17from rich.console import Console 

18from rich.progress import BarColumn, DownloadColumn, Progress, TaskID, TimeRemainingColumn, TransferSpeedColumn 

19 

20console = Console() 

21 

22 

23class AdditionalDescription(TypedDict): 

24 description: str 

25 type: dict[str, str] 

26 

27 

28class _InvenioRDMMetadataRequired(TypedDict): 

29 title: str 

30 publication_date: str 

31 resource_type: dict[str, str] 

32 creators: list[dict[str, object]] 

33 

34 

35class InvenioRDMMetadata(_InvenioRDMMetadataRequired, total=False): 

36 description: str 

37 additional_descriptions: list[AdditionalDescription] 

38 subjects: list[dict[str, str]] 

39 languages: list[dict[str, str]] 

40 dates: list[dict[str, object]] 

41 related_identifiers: list[dict[str, object]] 

42 rights: list[dict[str, object]] 

43 contributors: list[dict[str, object]] 

44 funding: list[dict[str, object]] 

45 version: str 

46 locations: list[dict[str, object]] 

47 identifiers: list[dict[str, str]] 

48 publisher: str 

49 references: list[str | dict[str, str]] 

50 

51 

52class InvenioRDMPayload(TypedDict): 

53 access: dict[str, str] 

54 files: dict[str, bool] 

55 metadata: InvenioRDMMetadata 

56 

57 

58class ZenodoDraft(TypedDict): 

59 id: str | int 

60 

61 

62class ZenodoPublished(TypedDict): 

63 id: str | int 

64 links: dict[str, str] 

65 

66 

67class _ZenodoConfigRequired(InvenioRDMMetadata): 

68 zenodo_url: str 

69 access_token: str 

70 user_agent: str 

71 access: dict[str, str] 

72 files: list[str] 

73 

74 

75class ZenodoConfig(_ZenodoConfigRequired, total=False): 

76 record_id: str 

77 community: str 

78 

79 

80def get_headers(token: str, user_agent: str, content_type: str | None = None) -> dict[str, str]: 

81 headers = { 

82 "Authorization": f"Bearer {token}", 

83 "User-Agent": user_agent, 

84 } 

85 if content_type: 

86 headers["Content-Type"] = content_type 

87 return headers 

88 

89 

90class ProgressFileWrapper: 

91 def __init__(self, file_path: str, progress: Progress, task_id: TaskID) -> None: 

92 self.file_path = file_path 

93 self.file_size = Path(file_path).stat().st_size 

94 self.fp = Path(file_path).open("rb") # noqa: SIM115 

95 self.progress = progress 

96 self.task_id = task_id 

97 

98 def read(self, size: int = -1) -> bytes: 

99 data = self.fp.read(size) 

100 self.progress.update(self.task_id, advance=len(data)) 

101 return data 

102 

103 def __len__(self) -> int: 

104 return self.file_size 

105 

106 def close(self) -> None: 

107 self.fp.close() 

108 

109 def __enter__(self) -> Self: 

110 return self 

111 

112 def __exit__( 

113 self, 

114 _exc_type: type[BaseException] | None, 

115 _exc_val: BaseException | None, 

116 _exc_tb: object, 

117 ) -> None: 

118 self.close() 

119 

120 

121def upload_file_with_retry(base_url: str, record_id: str, file_path: str, token: str, user_agent: str) -> None: 

122 filename = Path(file_path).name 

123 file_size = Path(file_path).stat().st_size 

124 files_url = f"{base_url}/records/{record_id}/draft/files" 

125 

126 attempt = 0 

127 while True: 

128 attempt += 1 

129 try: 

130 console.print(f"\nAttempt {attempt}: {filename}") 

131 

132 response = requests.post( 

133 files_url, 

134 headers=get_headers(token, user_agent, "application/json"), 

135 json=[{"key": filename}], 

136 timeout=30, 

137 ) 

138 response.raise_for_status() 

139 

140 content_url = f"{files_url}/{filename}/content" 

141 with Progress( 

142 "[progress.description]{task.description}", 

143 BarColumn(), 

144 DownloadColumn(), 

145 TransferSpeedColumn(), 

146 TimeRemainingColumn(), 

147 ) as progress: 

148 task_id = progress.add_task(filename, total=file_size) 

149 with ProgressFileWrapper(file_path, progress, task_id) as wrapper: 

150 response = requests.put( 

151 content_url, 

152 data=wrapper, 

153 headers=get_headers(token, user_agent, "application/octet-stream"), 

154 timeout=(30, 3600), 

155 ) 

156 response.raise_for_status() 

157 

158 commit_url = f"{files_url}/{filename}/commit" 

159 response = requests.post(commit_url, headers=get_headers(token, user_agent), timeout=30) 

160 response.raise_for_status() 

161 

162 console.print(f"[OK] {filename} uploaded successfully") 

163 break 

164 

165 except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: 

166 console.print(f"[ERROR] Network error: {e}") 

167 wait = min(2 ** (attempt - 1), 60) 

168 console.print(f"Retrying in {wait}s...") 

169 time.sleep(wait) 

170 except requests.exceptions.HTTPError as e: 

171 console.print(f"[ERROR] HTTP error: {e}") 

172 raise 

173 

174 

175def create_draft(base_url: str, token: str, user_agent: str, metadata: InvenioRDMPayload) -> ZenodoDraft: 

176 response = requests.post( 

177 f"{base_url}/records", 

178 headers=get_headers(token, user_agent, "application/json"), 

179 json=metadata, 

180 timeout=30, 

181 ) 

182 if not response.ok: 

183 console.print(f"Error creating draft: {response.status_code}") 

184 console.print(f"Response: {response.text}") 

185 response.raise_for_status() 

186 draft = response.json() 

187 console.print(f"Created new draft: {draft['id']}") 

188 return draft 

189 

190 

191def create_new_version(base_url: str, token: str, record_id: str, user_agent: str) -> ZenodoDraft: 

192 headers = get_headers(token, user_agent) 

193 response = requests.post( 

194 f"{base_url}/records/{record_id}/versions", 

195 headers=headers, 

196 timeout=30, 

197 ) 

198 response.raise_for_status() 

199 draft = response.json() 

200 console.print(f"Created new version draft: {draft['id']} (from {record_id})") 

201 return draft 

202 

203 

204def delete_draft_files(base_url: str, token: str, record_id: str, user_agent: str) -> None: 

205 headers = get_headers(token, user_agent) 

206 response = requests.get( 

207 f"{base_url}/records/{record_id}/draft/files", 

208 headers=headers, 

209 timeout=30, 

210 ) 

211 response.raise_for_status() 

212 files = response.json()["entries"] 

213 

214 if files: 

215 console.print(f"Deleting {len(files)} existing files from draft...") 

216 for f in files: 

217 filename = f["key"] 

218 delete_url = f"{base_url}/records/{record_id}/draft/files/{filename}" 

219 delete_response = requests.delete(delete_url, headers=headers, timeout=30) 

220 delete_response.raise_for_status() 

221 console.print(f" Deleted: {filename}") 

222 

223 

224def update_draft_metadata( 

225 base_url: str, 

226 token: str, 

227 record_id: str, 

228 metadata: InvenioRDMPayload, 

229 user_agent: str, 

230) -> None: 

231 response = requests.put( 

232 f"{base_url}/records/{record_id}/draft", 

233 headers=get_headers(token, user_agent, "application/json"), 

234 json=metadata, 

235 timeout=30, 

236 ) 

237 if not response.ok: 

238 console.print(f"Error updating metadata: {response.status_code}") 

239 console.print(f"Response: {response.text}") 

240 response.raise_for_status() 

241 console.print(f"Metadata updated for draft {record_id}") 

242 

243 

244def _resolve_community_id(base_url: str, community_slug: str) -> str: 

245 response = requests.get(f"{base_url}/communities/{community_slug}", timeout=30) 

246 response.raise_for_status() 

247 return response.json()["id"] 

248 

249 

250def submit_community_review(base_url: str, token: str, record_id: str, community_slug: str, user_agent: str) -> None: 

251 headers = get_headers(token, user_agent, "application/json") 

252 community_uuid = _resolve_community_id(base_url, community_slug) 

253 response = requests.put( 

254 f"{base_url}/records/{record_id}/draft/review", 

255 headers=headers, 

256 json={"receiver": {"community": community_uuid}, "type": "community-submission"}, 

257 timeout=30, 

258 ) 

259 if not response.ok: 

260 console.print(f"Error creating community review: {response.status_code}") 

261 console.print(f"Response: {response.text}") 

262 response.raise_for_status() 

263 response = requests.post( 

264 f"{base_url}/records/{record_id}/draft/actions/submit-review", 

265 headers=headers, 

266 json={"payload": {"content": "Automated submission", "format": "html"}}, 

267 timeout=30, 

268 ) 

269 if not response.ok: 

270 console.print(f"Error submitting community review: {response.status_code}") 

271 console.print(f"Response: {response.text}") 

272 response.raise_for_status() 

273 console.print(f"Submitted review for community {community_slug}") 

274 

275 

276def publish_draft(base_url: str, token: str, record_id: str, user_agent: str) -> ZenodoPublished: 

277 response = requests.post( 

278 f"{base_url}/records/{record_id}/draft/actions/publish", 

279 headers=get_headers(token, user_agent), 

280 timeout=30, 

281 ) 

282 if not response.ok: 

283 console.print(f"Error publishing draft: {response.status_code}") 

284 console.print(f"Response: {response.text}") 

285 response.raise_for_status() 

286 published = response.json() 

287 console.print(f"Published: {published['links']['self_html']}") 

288 return published 

289 

290 

291def text_to_html(text: str) -> str: 

292 paragraphs = text.strip().split("\n\n") 

293 html_parts = [] 

294 for p in paragraphs: 

295 lines = p.strip().split("\n") 

296 if lines[0].strip().startswith("- "): 

297 items = [f"<li>{line.strip()[2:]}</li>" for line in lines if line.strip().startswith("- ")] 

298 html_parts.append(f"<ul>{''.join(items)}</ul>") 

299 else: 

300 html_parts.append(f"<p>{('<br>'.join(lines))}</p>") 

301 return "".join(html_parts) 

302 

303 

304def build_inveniordm_payload(metadata_config: InvenioRDMMetadata, access: dict[str, str]) -> InvenioRDMPayload: 

305 metadata: dict[str, object] = {} 

306 

307 metadata["title"] = metadata_config["title"] 

308 metadata["resource_type"] = metadata_config["resource_type"] 

309 metadata["creators"] = metadata_config["creators"] 

310 metadata["publication_date"] = metadata_config["publication_date"] 

311 

312 if "description" in metadata_config: 

313 metadata["description"] = text_to_html(metadata_config["description"]) 

314 

315 if "additional_descriptions" in metadata_config: 

316 metadata["additional_descriptions"] = [ 

317 {"description": text_to_html(d["description"]), "type": d["type"]} 

318 for d in metadata_config["additional_descriptions"] 

319 ] 

320 

321 config_dict = cast("dict[str, object]", metadata_config) 

322 for field in ( 

323 "subjects", 

324 "languages", 

325 "dates", 

326 "related_identifiers", 

327 "rights", 

328 "contributors", 

329 "funding", 

330 "version", 

331 "locations", 

332 "identifiers", 

333 ): 

334 if field in config_dict: 

335 metadata[field] = config_dict[field] 

336 

337 metadata["publisher"] = metadata_config.get("publisher", "Zenodo") 

338 

339 if "references" in metadata_config: 

340 metadata["references"] = [ 

341 {"reference": ref} if isinstance(ref, str) else ref for ref in metadata_config["references"] 

342 ] 

343 

344 return cast( 

345 "InvenioRDMPayload", 

346 { 

347 "access": access, 

348 "files": {"enabled": True}, 

349 "metadata": metadata, 

350 }, 

351 ) 

352 

353 

354def main(config_file: str, *, publish: bool = False) -> ZenodoDraft | ZenodoPublished: 

355 with Path(config_file).open() as f: 

356 config = cast("ZenodoConfig", yaml.safe_load(f)) 

357 

358 base_url = config["zenodo_url"].rstrip("/") 

359 token = config["access_token"] 

360 user_agent = config["user_agent"] 

361 record_id = config.get("record_id") 

362 community = config.get("community") 

363 

364 payload = build_inveniordm_payload(config, config["access"]) 

365 

366 if record_id: 

367 draft = create_new_version(base_url, token, record_id, user_agent) 

368 draft_id = str(draft["id"]) 

369 delete_draft_files(base_url, token, draft_id, user_agent) 

370 update_draft_metadata(base_url, token, draft_id, payload, user_agent) 

371 else: 

372 draft = create_draft(base_url, token, user_agent, payload) 

373 draft_id = str(draft["id"]) 

374 

375 console.print(f"Draft ID: {draft_id}") 

376 console.print(f"Files to upload: {len(config['files'])}") 

377 

378 for file_path in config["files"]: 

379 upload_file_with_retry(base_url, draft_id, str(file_path), token, user_agent) 

380 

381 if community and "sandbox" not in base_url and not record_id: 

382 submit_community_review(base_url, token, draft_id, str(community), user_agent) 

383 

384 if publish: 

385 return publish_draft(base_url, token, draft_id, user_agent) 

386 console.print(f"\nDraft ready for review: {base_url.replace('/api', '')}/uploads/{draft_id}") 

387 console.print("Run with --publish to publish automatically") 

388 return draft 

389 

390 

391if __name__ == "__main__": # pragma: no cover 

392 parser = argparse.ArgumentParser() 

393 parser.add_argument("config_file") 

394 parser.add_argument("--publish", action="store_true", help="Publish after upload") 

395 args = parser.parse_args() 

396 main(args.config_file, publish=args.publish)