Coverage for crowdsourcing / zenodo_utils.py: 100%
24 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 14:31 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-21 14:31 +0000
1# SPDX-FileCopyrightText: 2025 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import os
6from typing import Tuple
8import requests
11def get_zenodo_token() -> str:
12 """Get the appropriate Zenodo token based on environment."""
13 environment = os.environ.get("ENVIRONMENT", "development")
14 if environment == "development":
15 token = os.environ.get("ZENODO_SANDBOX")
16 if not token:
17 raise ValueError("ZENODO_SANDBOX token not found in environment")
18 return token
19 else:
20 token = os.environ.get("ZENODO_PRODUCTION")
21 if not token:
22 raise ValueError("ZENODO_PRODUCTION token not found in environment")
23 return token
26def get_zenodo_base_url() -> str:
27 """Get the appropriate Zenodo API base URL based on environment."""
28 environment = os.environ.get("ENVIRONMENT", "development")
29 return (
30 "https://sandbox.zenodo.org/api"
31 if environment == "development"
32 else "https://zenodo.org/api"
33 )
36def create_deposition_resource(
37 date: str, metadata: dict, base_url: str = None
38) -> Tuple[str, str]:
39 """Create a new deposition resource on Zenodo.
41 Args:
42 date: The publication date in ISO format (YYYY-MM-DD)
43 metadata: The metadata for the deposition (will be wrapped in {"metadata": metadata})
44 base_url: The Zenodo API base URL (optional, defaults to environment-based URL)
46 Returns:
47 Tuple containing:
48 - str: The deposition ID
49 - str: The bucket URL for file uploads
50 """
51 headers = {"Content-Type": "application/json"}
53 # Zenodo API expects metadata wrapped in a "metadata" key
54 payload = {"metadata": metadata}
56 response = requests.post(
57 f"{base_url}/deposit/depositions",
58 params={"access_token": get_zenodo_token()},
59 json=payload,
60 headers=headers,
61 timeout=30,
62 )
64 response.raise_for_status()
65 data = response.json()
67 return data["id"], data["links"]["bucket"]