Coverage for src / piccione / upload / on_zenodo.py: 76%
170 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-21 11:49 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-21 11:49 +0000
1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import time
7from pathlib import Path
9import requests
10import yaml
11from rich.progress import Progress, BarColumn, DownloadColumn, TransferSpeedColumn, TimeRemainingColumn
14def get_headers(token: str, user_agent: str, content_type: str | None = None) -> dict[str, str]:
15 headers = {
16 "Authorization": f"Bearer {token}",
17 "User-Agent": user_agent,
18 }
19 if content_type:
20 headers["Content-Type"] = content_type
21 return headers
24class ProgressFileWrapper:
25 def __init__(self, file_path: str, progress: Progress, task_id: int):
26 self.file_path = file_path
27 self.file_size = Path(file_path).stat().st_size
28 self.fp = open(file_path, "rb")
29 self.progress = progress
30 self.task_id = task_id
32 def read(self, size: int = -1) -> bytes:
33 data = self.fp.read(size)
34 self.progress.update(self.task_id, advance=len(data))
35 return data
37 def __len__(self) -> int:
38 return self.file_size
40 def close(self) -> None:
41 self.fp.close()
43 def __enter__(self):
44 return self
46 def __exit__(self, *args):
47 self.close()
50def upload_file_with_retry(base_url: str, record_id: str, file_path: str, token: str, user_agent: str) -> None:
51 filename = Path(file_path).name
52 file_size = Path(file_path).stat().st_size
53 files_url = f"{base_url}/records/{record_id}/draft/files"
55 attempt = 0
56 while True:
57 attempt += 1
58 try:
59 print(f"\nAttempt {attempt}: {filename}")
61 response = requests.post(
62 files_url,
63 headers=get_headers(token, user_agent, "application/json"),
64 json=[{"key": filename}],
65 )
66 response.raise_for_status()
68 content_url = f"{files_url}/{filename}/content"
69 with Progress(
70 "[progress.description]{task.description}",
71 BarColumn(),
72 DownloadColumn(),
73 TransferSpeedColumn(),
74 TimeRemainingColumn(),
75 ) as progress:
76 task_id = progress.add_task(filename, total=file_size)
77 with ProgressFileWrapper(file_path, progress, task_id) as wrapper:
78 response = requests.put(
79 content_url,
80 data=wrapper,
81 headers=get_headers(token, user_agent, "application/octet-stream"),
82 timeout=(30, 3600),
83 )
84 response.raise_for_status()
86 commit_url = f"{files_url}/{filename}/commit"
87 response = requests.post(commit_url, headers=get_headers(token, user_agent))
88 response.raise_for_status()
90 print(f"[OK] {filename} uploaded successfully")
91 break
93 except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
94 print(f"[ERROR] Network error: {e}")
95 wait = min(2 ** (attempt - 1), 60)
96 print(f"Retrying in {wait}s...")
97 time.sleep(wait)
98 except requests.exceptions.HTTPError as e:
99 print(f"[ERROR] HTTP error: {e}")
100 raise
103def create_draft(base_url: str, token: str, user_agent: str, metadata: dict) -> dict:
104 response = requests.post(
105 f"{base_url}/records",
106 headers=get_headers(token, user_agent, "application/json"),
107 json=metadata,
108 )
109 if not response.ok:
110 print(f"Error creating draft: {response.status_code}")
111 print(f"Response: {response.text}")
112 response.raise_for_status()
113 draft = response.json()
114 print(f"Created new draft: {draft['id']}")
115 return draft
118def create_new_version(base_url: str, token: str, record_id: str, user_agent: str) -> dict:
119 headers = get_headers(token, user_agent)
120 response = requests.post(
121 f"{base_url}/records/{record_id}/versions",
122 headers=headers,
123 )
124 response.raise_for_status()
125 draft = response.json()
126 print(f"Created new version draft: {draft['id']} (from {record_id})")
127 return draft
130def delete_draft_files(base_url: str, token: str, record_id: str, user_agent: str) -> None:
131 headers = get_headers(token, user_agent)
132 response = requests.get(
133 f"{base_url}/records/{record_id}/draft/files",
134 headers=headers,
135 )
136 response.raise_for_status()
137 files = response.json()["entries"]
139 if files:
140 print(f"Deleting {len(files)} existing files from draft...")
141 for f in files:
142 filename = f["key"]
143 delete_url = f"{base_url}/records/{record_id}/draft/files/{filename}"
144 delete_response = requests.delete(delete_url, headers=headers)
145 delete_response.raise_for_status()
146 print(f" Deleted: {filename}")
149def update_draft_metadata(base_url: str, token: str, record_id: str, metadata: dict, user_agent: str) -> None:
150 response = requests.put(
151 f"{base_url}/records/{record_id}/draft",
152 headers=get_headers(token, user_agent, "application/json"),
153 json=metadata,
154 )
155 if not response.ok:
156 print(f"Error updating metadata: {response.status_code}")
157 print(f"Response: {response.text}")
158 response.raise_for_status()
159 print(f"Metadata updated for draft {record_id}")
162def submit_community_review(base_url: str, token: str, record_id: str, community_id: str, user_agent: str) -> None:
163 headers = get_headers(token, user_agent, "application/json")
164 response = requests.put(
165 f"{base_url}/records/{record_id}/draft/review",
166 headers=headers,
167 json={"receiver": {"community": community_id}, "type": "community-submission"},
168 )
169 if not response.ok:
170 print(f"Error submitting community review: {response.status_code}")
171 print(f"Response: {response.text}")
172 response.raise_for_status()
173 response = requests.post(
174 f"{base_url}/records/{record_id}/draft/actions/submit-review",
175 headers=headers,
176 json={"body": "", "format": "html"},
177 )
178 response.raise_for_status()
179 print(f"Submitted review for community {community_id}")
182def publish_draft(base_url: str, token: str, record_id: str, user_agent: str) -> dict:
183 response = requests.post(
184 f"{base_url}/records/{record_id}/draft/actions/publish",
185 headers=get_headers(token, user_agent),
186 )
187 if not response.ok:
188 print(f"Error publishing draft: {response.status_code}")
189 print(f"Response: {response.text}")
190 response.raise_for_status()
191 published = response.json()
192 print(f"Published: {published['links']['self_html']}")
193 return published
196def text_to_html(text: str) -> str:
197 paragraphs = text.strip().split("\n\n")
198 html_parts = []
199 for p in paragraphs:
200 lines = p.strip().split("\n")
201 if lines[0].strip().startswith("- "):
202 items = [f"<li>{line.strip()[2:]}</li>" for line in lines if line.strip().startswith("- ")]
203 html_parts.append(f"<ul>{''.join(items)}</ul>")
204 else:
205 html_parts.append(f"<p>{('<br>'.join(lines))}</p>")
206 return "".join(html_parts)
209def build_inveniordm_payload(config: dict) -> dict:
210 metadata = {}
212 metadata["title"] = config["title"]
213 metadata["resource_type"] = config["resource_type"]
214 metadata["creators"] = config["creators"]
215 metadata["publication_date"] = config["publication_date"]
217 if "description" in config:
218 metadata["description"] = text_to_html(config["description"])
220 if "additional_descriptions" in config:
221 metadata["additional_descriptions"] = [
222 {"description": text_to_html(d["description"]), "type": d["type"]}
223 for d in config["additional_descriptions"]
224 ]
226 for field in ("subjects", "languages", "dates", "related_identifiers",
227 "rights", "contributors", "funding",
228 "version", "locations", "identifiers"):
229 if field in config:
230 metadata[field] = config[field]
232 metadata["publisher"] = config.get("publisher", "Zenodo")
234 if "references" in config:
235 metadata["references"] = [
236 {"reference": ref} if isinstance(ref, str) else ref
237 for ref in config["references"]
238 ]
240 return {
241 "access": config["access"],
242 "files": {"enabled": True},
243 "metadata": metadata,
244 }
247def main(config_file: str, publish: bool = False) -> dict:
248 with open(config_file) as f:
249 config = yaml.safe_load(f)
251 base_url = config["zenodo_url"].rstrip("/")
252 token = config["access_token"]
253 user_agent = config["user_agent"]
254 record_id = config.get("record_id")
255 community = config.get("community")
257 payload = build_inveniordm_payload(config)
259 if record_id:
260 draft = create_new_version(base_url, token, record_id, user_agent)
261 draft_id = draft["id"]
262 delete_draft_files(base_url, token, draft_id, user_agent)
263 update_draft_metadata(base_url, token, draft_id, payload, user_agent)
264 else:
265 draft = create_draft(base_url, token, user_agent, payload)
266 draft_id = draft["id"]
268 print(f"Draft ID: {draft_id}")
269 print(f"Files to upload: {len(config['files'])}")
271 for file_path in config["files"]:
272 upload_file_with_retry(base_url, draft_id, file_path, token, user_agent)
274 if community and "sandbox" not in base_url and not record_id:
275 submit_community_review(base_url, token, draft_id, community, user_agent)
277 if publish:
278 published = publish_draft(base_url, token, draft_id, user_agent)
279 return published
280 else:
281 print(f"\nDraft ready for review: {base_url.replace('/api', '')}/uploads/{draft_id}")
282 print("Run with --publish to publish automatically")
283 return draft
286if __name__ == "__main__": # pragma: no cover
287 parser = argparse.ArgumentParser()
288 parser.add_argument("config_file")
289 parser.add_argument("--publish", action="store_true", help="Publish after upload")
290 args = parser.parse_args()
291 main(args.config_file, args.publish)