Coverage for src / piccione / upload / on_zenodo.py: 78%
223 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-27 20:21 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-27 20:21 +0000
1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import argparse
8import time
9from pathlib import Path
10from typing import TYPE_CHECKING, TypedDict, cast
12if TYPE_CHECKING:
13 from typing import Self
15import requests
16import yaml
17from rich.console import Console
18from rich.progress import BarColumn, DownloadColumn, Progress, TaskID, TimeRemainingColumn, TransferSpeedColumn
20console = Console()
23class AdditionalDescription(TypedDict):
24 description: str
25 type: dict[str, str]
28class _InvenioRDMMetadataRequired(TypedDict):
29 title: str
30 publication_date: str
31 resource_type: dict[str, str]
32 creators: list[dict[str, object]]
35class InvenioRDMMetadata(_InvenioRDMMetadataRequired, total=False):
36 description: str
37 additional_descriptions: list[AdditionalDescription]
38 subjects: list[dict[str, str]]
39 languages: list[dict[str, str]]
40 dates: list[dict[str, object]]
41 related_identifiers: list[dict[str, object]]
42 rights: list[dict[str, object]]
43 contributors: list[dict[str, object]]
44 funding: list[dict[str, object]]
45 version: str
46 locations: list[dict[str, object]]
47 identifiers: list[dict[str, str]]
48 publisher: str
49 references: list[str | dict[str, str]]
52class InvenioRDMPayload(TypedDict):
53 access: dict[str, str]
54 files: dict[str, bool]
55 metadata: InvenioRDMMetadata
58class ZenodoDraft(TypedDict):
59 id: str | int
62class ZenodoPublished(TypedDict):
63 id: str | int
64 links: dict[str, str]
67class _ZenodoConfigRequired(InvenioRDMMetadata):
68 zenodo_url: str
69 access_token: str
70 user_agent: str
71 access: dict[str, str]
72 files: list[str]
75class ZenodoConfig(_ZenodoConfigRequired, total=False):
76 record_id: str
77 community: str
80def get_headers(token: str, user_agent: str, content_type: str | None = None) -> dict[str, str]:
81 headers = {
82 "Authorization": f"Bearer {token}",
83 "User-Agent": user_agent,
84 }
85 if content_type:
86 headers["Content-Type"] = content_type
87 return headers
90class ProgressFileWrapper:
91 def __init__(self, file_path: str, progress: Progress, task_id: TaskID) -> None:
92 self.file_path = file_path
93 self.file_size = Path(file_path).stat().st_size
94 self.fp = Path(file_path).open("rb") # noqa: SIM115
95 self.progress = progress
96 self.task_id = task_id
98 def read(self, size: int = -1) -> bytes:
99 data = self.fp.read(size)
100 self.progress.update(self.task_id, advance=len(data))
101 return data
103 def __len__(self) -> int:
104 return self.file_size
106 def close(self) -> None:
107 self.fp.close()
109 def __enter__(self) -> Self:
110 return self
112 def __exit__(
113 self,
114 _exc_type: type[BaseException] | None,
115 _exc_val: BaseException | None,
116 _exc_tb: object,
117 ) -> None:
118 self.close()
121def upload_file_with_retry(base_url: str, record_id: str, file_path: str, token: str, user_agent: str) -> None:
122 filename = Path(file_path).name
123 file_size = Path(file_path).stat().st_size
124 files_url = f"{base_url}/records/{record_id}/draft/files"
126 attempt = 0
127 while True:
128 attempt += 1
129 try:
130 console.print(f"\nAttempt {attempt}: {filename}")
132 response = requests.post(
133 files_url,
134 headers=get_headers(token, user_agent, "application/json"),
135 json=[{"key": filename}],
136 timeout=30,
137 )
138 response.raise_for_status()
140 content_url = f"{files_url}/{filename}/content"
141 with Progress(
142 "[progress.description]{task.description}",
143 BarColumn(),
144 DownloadColumn(),
145 TransferSpeedColumn(),
146 TimeRemainingColumn(),
147 ) as progress:
148 task_id = progress.add_task(filename, total=file_size)
149 with ProgressFileWrapper(file_path, progress, task_id) as wrapper:
150 response = requests.put(
151 content_url,
152 data=wrapper,
153 headers=get_headers(token, user_agent, "application/octet-stream"),
154 timeout=(30, 3600),
155 )
156 response.raise_for_status()
158 commit_url = f"{files_url}/{filename}/commit"
159 response = requests.post(commit_url, headers=get_headers(token, user_agent), timeout=30)
160 response.raise_for_status()
162 console.print(f"[OK] {filename} uploaded successfully")
163 break
165 except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
166 console.print(f"[ERROR] Network error: {e}")
167 wait = min(2 ** (attempt - 1), 60)
168 console.print(f"Retrying in {wait}s...")
169 time.sleep(wait)
170 except requests.exceptions.HTTPError as e:
171 console.print(f"[ERROR] HTTP error: {e}")
172 raise
175def create_draft(base_url: str, token: str, user_agent: str, metadata: InvenioRDMPayload) -> ZenodoDraft:
176 response = requests.post(
177 f"{base_url}/records",
178 headers=get_headers(token, user_agent, "application/json"),
179 json=metadata,
180 timeout=30,
181 )
182 if not response.ok:
183 console.print(f"Error creating draft: {response.status_code}")
184 console.print(f"Response: {response.text}")
185 response.raise_for_status()
186 draft = response.json()
187 console.print(f"Created new draft: {draft['id']}")
188 return draft
191def create_new_version(base_url: str, token: str, record_id: str, user_agent: str) -> ZenodoDraft:
192 headers = get_headers(token, user_agent)
193 response = requests.post(
194 f"{base_url}/records/{record_id}/versions",
195 headers=headers,
196 timeout=30,
197 )
198 response.raise_for_status()
199 draft = response.json()
200 console.print(f"Created new version draft: {draft['id']} (from {record_id})")
201 return draft
204def delete_draft_files(base_url: str, token: str, record_id: str, user_agent: str) -> None:
205 headers = get_headers(token, user_agent)
206 response = requests.get(
207 f"{base_url}/records/{record_id}/draft/files",
208 headers=headers,
209 timeout=30,
210 )
211 response.raise_for_status()
212 files = response.json()["entries"]
214 if files:
215 console.print(f"Deleting {len(files)} existing files from draft...")
216 for f in files:
217 filename = f["key"]
218 delete_url = f"{base_url}/records/{record_id}/draft/files/{filename}"
219 delete_response = requests.delete(delete_url, headers=headers, timeout=30)
220 delete_response.raise_for_status()
221 console.print(f" Deleted: {filename}")
224def update_draft_metadata(
225 base_url: str,
226 token: str,
227 record_id: str,
228 metadata: InvenioRDMPayload,
229 user_agent: str,
230) -> None:
231 response = requests.put(
232 f"{base_url}/records/{record_id}/draft",
233 headers=get_headers(token, user_agent, "application/json"),
234 json=metadata,
235 timeout=30,
236 )
237 if not response.ok:
238 console.print(f"Error updating metadata: {response.status_code}")
239 console.print(f"Response: {response.text}")
240 response.raise_for_status()
241 console.print(f"Metadata updated for draft {record_id}")
244def _resolve_community_id(base_url: str, community_slug: str) -> str:
245 response = requests.get(f"{base_url}/communities/{community_slug}", timeout=30)
246 response.raise_for_status()
247 return response.json()["id"]
250def submit_community_review(base_url: str, token: str, record_id: str, community_slug: str, user_agent: str) -> None:
251 headers = get_headers(token, user_agent, "application/json")
252 community_uuid = _resolve_community_id(base_url, community_slug)
253 response = requests.put(
254 f"{base_url}/records/{record_id}/draft/review",
255 headers=headers,
256 json={"receiver": {"community": community_uuid}, "type": "community-submission"},
257 timeout=30,
258 )
259 if not response.ok:
260 console.print(f"Error creating community review: {response.status_code}")
261 console.print(f"Response: {response.text}")
262 response.raise_for_status()
263 response = requests.post(
264 f"{base_url}/records/{record_id}/draft/actions/submit-review",
265 headers=headers,
266 json={"payload": {"content": "Automated submission", "format": "html"}},
267 timeout=30,
268 )
269 if not response.ok:
270 console.print(f"Error submitting community review: {response.status_code}")
271 console.print(f"Response: {response.text}")
272 response.raise_for_status()
273 console.print(f"Submitted review for community {community_slug}")
276def publish_draft(base_url: str, token: str, record_id: str, user_agent: str) -> ZenodoPublished:
277 response = requests.post(
278 f"{base_url}/records/{record_id}/draft/actions/publish",
279 headers=get_headers(token, user_agent),
280 timeout=30,
281 )
282 if not response.ok:
283 console.print(f"Error publishing draft: {response.status_code}")
284 console.print(f"Response: {response.text}")
285 response.raise_for_status()
286 published = response.json()
287 console.print(f"Published: {published['links']['self_html']}")
288 return published
291def text_to_html(text: str) -> str:
292 paragraphs = text.strip().split("\n\n")
293 html_parts = []
294 for p in paragraphs:
295 lines = p.strip().split("\n")
296 if lines[0].strip().startswith("- "):
297 items = [f"<li>{line.strip()[2:]}</li>" for line in lines if line.strip().startswith("- ")]
298 html_parts.append(f"<ul>{''.join(items)}</ul>")
299 else:
300 html_parts.append(f"<p>{('<br>'.join(lines))}</p>")
301 return "".join(html_parts)
304def build_inveniordm_payload(metadata_config: InvenioRDMMetadata, access: dict[str, str]) -> InvenioRDMPayload:
305 metadata: dict[str, object] = {}
307 metadata["title"] = metadata_config["title"]
308 metadata["resource_type"] = metadata_config["resource_type"]
309 metadata["creators"] = metadata_config["creators"]
310 metadata["publication_date"] = metadata_config["publication_date"]
312 if "description" in metadata_config:
313 metadata["description"] = text_to_html(metadata_config["description"])
315 if "additional_descriptions" in metadata_config:
316 metadata["additional_descriptions"] = [
317 {"description": text_to_html(d["description"]), "type": d["type"]}
318 for d in metadata_config["additional_descriptions"]
319 ]
321 config_dict = cast("dict[str, object]", metadata_config)
322 for field in (
323 "subjects",
324 "languages",
325 "dates",
326 "related_identifiers",
327 "rights",
328 "contributors",
329 "funding",
330 "version",
331 "locations",
332 "identifiers",
333 ):
334 if field in config_dict:
335 metadata[field] = config_dict[field]
337 metadata["publisher"] = metadata_config.get("publisher", "Zenodo")
339 if "references" in metadata_config:
340 metadata["references"] = [
341 {"reference": ref} if isinstance(ref, str) else ref for ref in metadata_config["references"]
342 ]
344 return cast(
345 "InvenioRDMPayload",
346 {
347 "access": access,
348 "files": {"enabled": True},
349 "metadata": metadata,
350 },
351 )
354def main(config_file: str, *, publish: bool = False) -> ZenodoDraft | ZenodoPublished:
355 with Path(config_file).open() as f:
356 config = cast("ZenodoConfig", yaml.safe_load(f))
358 base_url = config["zenodo_url"].rstrip("/")
359 token = config["access_token"]
360 user_agent = config["user_agent"]
361 record_id = config.get("record_id")
362 community = config.get("community")
364 payload = build_inveniordm_payload(config, config["access"])
366 if record_id:
367 draft = create_new_version(base_url, token, record_id, user_agent)
368 draft_id = str(draft["id"])
369 delete_draft_files(base_url, token, draft_id, user_agent)
370 update_draft_metadata(base_url, token, draft_id, payload, user_agent)
371 else:
372 draft = create_draft(base_url, token, user_agent, payload)
373 draft_id = str(draft["id"])
375 console.print(f"Draft ID: {draft_id}")
376 console.print(f"Files to upload: {len(config['files'])}")
378 for file_path in config["files"]:
379 upload_file_with_retry(base_url, draft_id, str(file_path), token, user_agent)
381 if community and "sandbox" not in base_url and not record_id:
382 submit_community_review(base_url, token, draft_id, str(community), user_agent)
384 if publish:
385 return publish_draft(base_url, token, draft_id, user_agent)
386 console.print(f"\nDraft ready for review: {base_url.replace('/api', '')}/uploads/{draft_id}")
387 console.print("Run with --publish to publish automatically")
388 return draft
391if __name__ == "__main__": # pragma: no cover
392 parser = argparse.ArgumentParser()
393 parser.add_argument("config_file")
394 parser.add_argument("--publish", action="store_true", help="Publish after upload")
395 args = parser.parse_args()
396 main(args.config_file, publish=args.publish)