Coverage for src / piccione / upload / on_zenodo.py: 77%
164 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-28 16:52 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-28 16:52 +0000
1import argparse
2import time
3from pathlib import Path
5import requests
6import yaml
7from rich.progress import Progress, BarColumn, DownloadColumn, TransferSpeedColumn, TimeRemainingColumn
10def get_headers(token: str, user_agent: str, content_type: str | None = None) -> dict[str, str]:
11 headers = {
12 "Authorization": f"Bearer {token}",
13 "User-Agent": user_agent,
14 }
15 if content_type:
16 headers["Content-Type"] = content_type
17 return headers
20class ProgressFileWrapper:
21 def __init__(self, file_path: str, progress: Progress, task_id: int):
22 self.file_path = file_path
23 self.file_size = Path(file_path).stat().st_size
24 self.fp = open(file_path, "rb")
25 self.progress = progress
26 self.task_id = task_id
28 def read(self, size: int = -1) -> bytes:
29 data = self.fp.read(size)
30 self.progress.update(self.task_id, advance=len(data))
31 return data
33 def __len__(self) -> int:
34 return self.file_size
36 def close(self) -> None:
37 self.fp.close()
39 def __enter__(self):
40 return self
42 def __exit__(self, *args):
43 self.close()
46def upload_file_with_retry(base_url: str, record_id: str, file_path: str, token: str, user_agent: str) -> None:
47 filename = Path(file_path).name
48 file_size = Path(file_path).stat().st_size
49 files_url = f"{base_url}/records/{record_id}/draft/files"
51 attempt = 0
52 while True:
53 attempt += 1
54 try:
55 print(f"\nAttempt {attempt}: {filename}")
57 response = requests.post(
58 files_url,
59 headers=get_headers(token, user_agent, "application/json"),
60 json=[{"key": filename}],
61 )
62 response.raise_for_status()
64 content_url = f"{files_url}/{filename}/content"
65 with Progress(
66 "[progress.description]{task.description}",
67 BarColumn(),
68 DownloadColumn(),
69 TransferSpeedColumn(),
70 TimeRemainingColumn(),
71 ) as progress:
72 task_id = progress.add_task(filename, total=file_size)
73 with ProgressFileWrapper(file_path, progress, task_id) as wrapper:
74 response = requests.put(
75 content_url,
76 data=wrapper,
77 headers=get_headers(token, user_agent, "application/octet-stream"),
78 timeout=(30, 3600),
79 )
80 response.raise_for_status()
82 commit_url = f"{files_url}/{filename}/commit"
83 response = requests.post(commit_url, headers=get_headers(token, user_agent))
84 response.raise_for_status()
86 print(f"[OK] {filename} uploaded successfully")
87 break
89 except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
90 print(f"[ERROR] Network error: {e}")
91 wait = min(2 ** (attempt - 1), 60)
92 print(f"Retrying in {wait}s...")
93 time.sleep(wait)
94 except requests.exceptions.HTTPError as e:
95 print(f"[ERROR] HTTP error: {e}")
96 raise
99def create_draft(base_url: str, token: str, user_agent: str, metadata: dict) -> dict:
100 response = requests.post(
101 f"{base_url}/records",
102 headers=get_headers(token, user_agent, "application/json"),
103 json=metadata,
104 )
105 if not response.ok:
106 print(f"Error creating draft: {response.status_code}")
107 print(f"Response: {response.text}")
108 response.raise_for_status()
109 draft = response.json()
110 print(f"Created new draft: {draft['id']}")
111 return draft
114def create_new_version(base_url: str, token: str, record_id: str, user_agent: str) -> dict:
115 headers = get_headers(token, user_agent)
116 response = requests.post(
117 f"{base_url}/records/{record_id}/versions",
118 headers=headers,
119 )
120 response.raise_for_status()
121 draft = response.json()
122 print(f"Created new version draft: {draft['id']} (from {record_id})")
123 return draft
126def delete_draft_files(base_url: str, token: str, record_id: str, user_agent: str) -> None:
127 headers = get_headers(token, user_agent)
128 response = requests.get(
129 f"{base_url}/records/{record_id}/draft/files",
130 headers=headers,
131 )
132 response.raise_for_status()
133 files = response.json()["entries"]
135 if files:
136 print(f"Deleting {len(files)} existing files from draft...")
137 for f in files:
138 filename = f["key"]
139 delete_url = f"{base_url}/records/{record_id}/draft/files/{filename}"
140 delete_response = requests.delete(delete_url, headers=headers)
141 delete_response.raise_for_status()
142 print(f" Deleted: {filename}")
145def update_draft_metadata(base_url: str, token: str, record_id: str, metadata: dict, user_agent: str) -> None:
146 response = requests.put(
147 f"{base_url}/records/{record_id}/draft",
148 headers=get_headers(token, user_agent, "application/json"),
149 json=metadata,
150 )
151 if not response.ok:
152 print(f"Error updating metadata: {response.status_code}")
153 print(f"Response: {response.text}")
154 response.raise_for_status()
155 print(f"Metadata updated for draft {record_id}")
158def submit_community_review(base_url: str, token: str, record_id: str, community_id: str, user_agent: str) -> None:
159 headers = get_headers(token, user_agent, "application/json")
160 response = requests.put(
161 f"{base_url}/records/{record_id}/draft/review",
162 headers=headers,
163 json={"receiver": {"community": community_id}, "type": "community-submission"},
164 )
165 response.raise_for_status()
166 response = requests.post(
167 f"{base_url}/records/{record_id}/draft/actions/submit-review",
168 headers=headers,
169 json={"body": "", "format": "html"},
170 )
171 response.raise_for_status()
172 print(f"Submitted review for community {community_id}")
175def publish_draft(base_url: str, token: str, record_id: str, user_agent: str) -> dict:
176 response = requests.post(
177 f"{base_url}/records/{record_id}/draft/actions/publish",
178 headers=get_headers(token, user_agent),
179 )
180 if not response.ok:
181 print(f"Error publishing draft: {response.status_code}")
182 print(f"Response: {response.text}")
183 response.raise_for_status()
184 published = response.json()
185 print(f"Published: {published['links']['self_html']}")
186 return published
189def text_to_html(text: str) -> str:
190 paragraphs = text.strip().split("\n\n")
191 html_parts = []
192 for p in paragraphs:
193 lines = p.strip().split("\n")
194 if lines[0].strip().startswith("- "):
195 items = [f"<li>{line.strip()[2:]}</li>" for line in lines if line.strip().startswith("- ")]
196 html_parts.append(f"<ul>{''.join(items)}</ul>")
197 else:
198 html_parts.append(f"<p>{('<br>'.join(lines))}</p>")
199 return "".join(html_parts)
202def build_inveniordm_payload(config: dict) -> dict:
203 metadata = {}
205 metadata["title"] = config["title"]
206 metadata["resource_type"] = config["resource_type"]
207 metadata["creators"] = config["creators"]
208 metadata["publication_date"] = config["publication_date"]
210 if "description" in config:
211 metadata["description"] = text_to_html(config["description"])
213 if "additional_descriptions" in config:
214 metadata["additional_descriptions"] = [
215 {
216 "description": text_to_html(d["description"]),
217 "type": d["type"],
218 }
219 for d in config["additional_descriptions"]
220 ]
222 for field in ("subjects", "languages", "dates", "related_identifiers",
223 "rights", "contributors", "funding", "references",
224 "version", "locations", "identifiers", "publisher"):
225 if field in config:
226 metadata[field] = config[field]
228 return {
229 "access": config["access"],
230 "files": {"enabled": True},
231 "metadata": metadata,
232 }
235def main(config_file: str, publish: bool = False) -> dict:
236 with open(config_file) as f:
237 config = yaml.safe_load(f)
239 base_url = config["zenodo_url"].rstrip("/")
240 token = config["access_token"]
241 user_agent = config["user_agent"]
242 record_id = config.get("record_id")
243 community = config.get("community")
245 payload = build_inveniordm_payload(config)
247 if record_id:
248 draft = create_new_version(base_url, token, record_id, user_agent)
249 draft_id = draft["id"]
250 delete_draft_files(base_url, token, draft_id, user_agent)
251 update_draft_metadata(base_url, token, draft_id, payload, user_agent)
252 else:
253 draft = create_draft(base_url, token, user_agent, payload)
254 draft_id = draft["id"]
256 print(f"Draft ID: {draft_id}")
257 print(f"Files to upload: {len(config['files'])}")
259 for file_path in config["files"]:
260 upload_file_with_retry(base_url, draft_id, file_path, token, user_agent)
262 if community and "sandbox" not in base_url:
263 submit_community_review(base_url, token, draft_id, community, user_agent)
265 if publish:
266 published = publish_draft(base_url, token, draft_id, user_agent)
267 return published
268 else:
269 print(f"\nDraft ready for review: {base_url.replace('/api', '')}/uploads/{draft_id}")
270 print("Run with --publish to publish automatically")
271 return draft
274if __name__ == "__main__": # pragma: no cover
275 parser = argparse.ArgumentParser()
276 parser.add_argument("config_file")
277 parser.add_argument("--publish", action="store_true", help="Publish after upload")
278 args = parser.parse_args()
279 main(args.config_file, args.publish)