Coverage for src / piccione / download / from_sharepoint.py: 87%
180 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-27 20:21 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-27 20:21 +0000
1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5from __future__ import annotations
7import argparse
8import json
9import time
10from contextlib import contextmanager
11from dataclasses import dataclass, field
12from datetime import datetime, timezone
13from pathlib import Path
14from typing import TYPE_CHECKING, TypedDict, cast
16if TYPE_CHECKING:
17 from collections.abc import Generator
19import httpx
20import yaml
21from rich.console import Console
22from rich.progress import (
23 BarColumn,
24 Progress,
25 TaskID,
26 TaskProgressColumn,
27 TextColumn,
28 TimeRemainingColumn,
29)
31console = Console()
34class SharePointFolder(TypedDict):
35 Name: str
36 ServerRelativeUrl: str
39class SharePointFile(TypedDict):
40 Name: str
41 Length: str
42 TimeLastModified: str
43 ETag: str
46class SharePointConfig(TypedDict):
47 site_url: str
48 fedauth: str
49 rtfa: str
50 folders: list[str]
53@dataclass
54class FileMetadata:
55 size: int
56 modified: str
57 etag: str
60@dataclass
61class FolderNode:
62 subfolders: dict[str, FolderNode] = field(default_factory=dict)
63 files: dict[str, FileMetadata] = field(default_factory=dict)
65 def to_dict(self) -> dict[str, object]:
66 result: dict[str, object] = {}
67 for name, child in self.subfolders.items():
68 result[name] = child.to_dict()
69 if self.files:
70 result["_files"] = {
71 name: {"size": meta.size, "modified": meta.modified, "etag": meta.etag}
72 for name, meta in self.files.items()
73 }
74 return result
76 @staticmethod
77 def from_dict(data: dict[str, object]) -> FolderNode:
78 node = FolderNode()
79 for key, value in data.items():
80 if not isinstance(value, dict):
81 msg = f"Expected dict for key {key!r}, got {type(value).__name__}"
82 raise TypeError(msg)
83 if key == "_files":
84 for filename, meta_raw in value.items():
85 if not isinstance(meta_raw, dict):
86 msg = f"Expected dict for file {filename!r}, got {type(meta_raw).__name__}"
87 raise TypeError(msg)
88 size, modified, etag = meta_raw["size"], meta_raw["modified"], meta_raw["etag"]
89 if not isinstance(size, int):
90 msg = f"Expected int for size of {filename!r}, got {type(size).__name__}"
91 raise TypeError(msg)
92 if not isinstance(modified, str):
93 msg = f"Expected str for modified of {filename!r}, got {type(modified).__name__}"
94 raise TypeError(msg)
95 if not isinstance(etag, str):
96 msg = f"Expected str for etag of {filename!r}, got {type(etag).__name__}"
97 raise TypeError(msg)
98 node.files[filename] = FileMetadata(size=size, modified=modified, etag=etag)
99 else:
100 node.subfolders[key] = FolderNode.from_dict(value)
101 return node
104def load_config(config_path: str | Path) -> SharePointConfig:
105 with Path(config_path).open() as f:
106 return cast("SharePointConfig", yaml.safe_load(f))
109def get_site_relative_url(site_url: str) -> str:
110 return "/" + "/".join(site_url.rstrip("/").split("/")[3:])
113def sort_structure(node: FolderNode) -> FolderNode:
114 sorted_subfolders = {k: sort_structure(v) for k, v in sorted(node.subfolders.items())}
115 sorted_files = dict(sorted(node.files.items()))
116 return FolderNode(subfolders=sorted_subfolders, files=sorted_files)
119HTTP_TOO_MANY_REQUESTS = 429
120HTTP_BAD_REQUEST = 400
123def request_with_retry(client: httpx.Client, url: str, max_retries: int = 5) -> httpx.Response: # pragma: no cover
124 for attempt in range(max_retries):
125 resp = client.get(url)
126 if resp.status_code == HTTP_TOO_MANY_REQUESTS:
127 wait_time = 2**attempt
128 time.sleep(wait_time)
129 continue
130 resp.raise_for_status()
131 return resp
132 msg = f"Rate limited after {max_retries} retries for {url}"
133 raise RuntimeError(msg)
136@contextmanager
137def stream_with_retry( # pragma: no cover
138 client: httpx.Client,
139 url: str,
140 max_retries: int = 5,
141) -> Generator[httpx.Response, None, None]:
142 for attempt in range(max_retries):
143 with client.stream("GET", url) as resp:
144 if resp.status_code == HTTP_TOO_MANY_REQUESTS:
145 wait_time = 2**attempt
146 time.sleep(wait_time)
147 continue
148 if resp.status_code >= HTTP_BAD_REQUEST:
149 resp.raise_for_status()
150 yield resp
151 return
152 msg = f"Rate limited after {max_retries} retries for {url}"
153 raise RuntimeError(msg)
156def get_folder_contents(
157 client: httpx.Client,
158 site_url: str,
159 folder_path: str,
160) -> tuple[list[SharePointFolder], list[SharePointFile]]:
161 api_url = f"{site_url}/_api/web/GetFolderByServerRelativeUrl('{folder_path}')"
163 folders_resp = request_with_retry(client, f"{api_url}/Folders")
164 folders_data = folders_resp.json()["d"]["results"]
166 files_resp = request_with_retry(client, f"{api_url}/Files")
167 files_data = files_resp.json()["d"]["results"]
169 return folders_data, files_data
172def get_folder_structure(client: httpx.Client, site_url: str, folder_path: str) -> FolderNode:
173 node = FolderNode()
175 folders, files = get_folder_contents(client, site_url, folder_path)
177 for folder in folders:
178 name = folder["Name"]
179 if name.startswith("_") or name == "Forms":
180 continue
181 node.subfolders[name] = get_folder_structure(client, site_url, folder["ServerRelativeUrl"])
183 for f in files:
184 node.files[f["Name"]] = FileMetadata(
185 size=int(f["Length"]),
186 modified=f["TimeLastModified"],
187 etag=f["ETag"],
188 )
190 return node
193def process_folder(
194 client: httpx.Client,
195 folder_path: str,
196 site_url: str,
197 progress: Progress,
198 task_id: TaskID,
199) -> tuple[str, str, FolderNode]:
200 folder_name = folder_path.rsplit("/", maxsplit=1)[-1]
201 progress.update(task_id, description=f"Scanning {folder_name}...")
202 structure = get_folder_structure(client, site_url, folder_path)
203 progress.advance(task_id)
204 return folder_name, folder_path, structure
207def extract_structure(
208 client: httpx.Client,
209 site_url: str,
210 folders: list[str],
211 progress: Progress,
212) -> tuple[dict[str, FolderNode], dict[str, str]]:
213 site_relative_url = get_site_relative_url(site_url)
215 task_id = progress.add_task("Discovering...", total=len(folders))
217 results = []
218 for folder in folders:
219 normalized = folder if folder.startswith("/") else "/" + folder
220 folder_path = site_relative_url + normalized
221 result = process_folder(client, folder_path, site_url, progress, task_id)
222 results.append(result)
224 structure = {name: sort_structure(folder_structure) for name, _, folder_structure in sorted(results)}
225 folder_paths = {name: path for name, path, _ in results}
226 return structure, folder_paths
229def collect_files_from_structure(
230 structure: dict[str, FolderNode],
231 folder_paths: dict[str, str],
232) -> list[tuple[str, str, FileMetadata]]:
233 files: list[tuple[str, str, FileMetadata]] = []
235 def traverse(node: FolderNode, current_path: str, base_server_path: str) -> None:
236 for filename, metadata in node.files.items():
237 server_path = (
238 f"{base_server_path}/{current_path}/{filename}" if current_path else f"{base_server_path}/{filename}"
239 )
240 local_path = f"{current_path}/{filename}" if current_path else filename
241 files.append((server_path, local_path, metadata))
242 for name, child in node.subfolders.items():
243 new_path = f"{current_path}/{name}" if current_path else name
244 traverse(child, new_path, base_server_path)
246 for folder_name, folder_node in structure.items():
247 base_path = folder_paths[folder_name]
248 traverse(folder_node, folder_name, base_path.rsplit("/", 1)[0])
250 return files
253def should_download(remote_meta: FileMetadata, local_path: Path) -> bool:
254 if not local_path.exists():
255 return True
256 local_size = local_path.stat().st_size
257 local_mtime = datetime.fromtimestamp(local_path.stat().st_mtime, tz=timezone.utc)
258 remote_mtime = datetime.fromisoformat(remote_meta.modified.replace("Z", "+00:00"))
259 return local_size != remote_meta.size or local_mtime < remote_mtime
262def download_file(client: httpx.Client, site_url: str, file_server_relative_url: str, local_path: Path) -> int:
263 url = f"{site_url}/_api/web/GetFileByServerRelativeUrl('{file_server_relative_url}')/$value"
265 local_path.parent.mkdir(parents=True, exist_ok=True)
267 with stream_with_retry(client, url) as response, local_path.open("wb") as f:
268 f.writelines(response.iter_bytes(chunk_size=8192))
270 return local_path.stat().st_size
273def collect_all_remote_paths(structure: dict[str, FolderNode], folder_paths: dict[str, str]) -> set[Path]:
274 return {Path(local_path) for _, local_path, _ in collect_files_from_structure(structure, folder_paths)}
277def remove_orphans(output_dir: Path, remote_paths: set[Path]) -> int:
278 local_files = {
279 p.relative_to(output_dir) for p in output_dir.rglob("*") if p.is_file() and p.name != "structure.json"
280 }
281 orphans = local_files - remote_paths
282 for orphan in orphans:
283 (output_dir / orphan).unlink()
284 console.print(f"[yellow]Removed: {orphan}")
285 return len(orphans)
288def download_all_files(
289 client: httpx.Client,
290 site_url: str,
291 structure: dict[str, FolderNode],
292 output_dir: Path,
293 folder_paths: dict[str, str],
294) -> None:
295 files = collect_files_from_structure(structure, folder_paths)
296 total = len(files)
298 downloaded = 0
299 updated = 0
300 skipped = 0
301 failed = 0
303 with Progress(
304 TextColumn("[progress.description]{task.description}"),
305 BarColumn(),
306 TaskProgressColumn(),
307 TimeRemainingColumn(),
308 console=console,
309 ) as progress:
310 task_id = progress.add_task("Downloading...", total=total)
312 for server_path, local_rel_path, metadata in files:
313 local_path = output_dir / local_rel_path
314 progress.update(task_id, description=f"[cyan]{local_rel_path}")
316 if not should_download(metadata, local_path):
317 skipped += 1
318 progress.advance(task_id)
319 continue
321 try:
322 was_update = local_path.exists()
323 download_file(client, site_url, server_path, local_path)
324 if was_update:
325 updated += 1
326 else:
327 downloaded += 1
328 except Exception as e: # noqa: BLE001
329 console.print(f"[red]Failed: {local_rel_path} ({e})")
330 failed += 1
332 progress.advance(task_id)
334 remote_paths = collect_all_remote_paths(structure, folder_paths)
335 removed = remove_orphans(output_dir, remote_paths)
337 console.print(
338 f"Downloaded: {downloaded}, Updated: {updated}, Skipped: {skipped}, Failed: {failed}, Removed: {removed}",
339 )
342def main() -> None: # pragma: no cover
343 parser = argparse.ArgumentParser()
344 parser.add_argument("config", type=Path)
345 parser.add_argument("output_dir", type=Path)
346 parser.add_argument("--structure-only", action="store_true")
347 parser.add_argument("--structure", type=Path, help="Path to existing structure JSON file")
348 args = parser.parse_args()
350 config = load_config(args.config)
352 site_url = config["site_url"]
353 fedauth = config["fedauth"]
354 rtfa = config["rtfa"]
355 folders = config["folders"]
357 if args.structure:
358 console.print("[bold blue][Phase 1][/] Loading structure from file...")
359 with args.structure.open() as f:
360 data = json.load(f)
361 structure = {name: FolderNode.from_dict(d) for name, d in data["structure"].items()}
362 folder_paths = data["folder_paths"]
363 console.print(f"Loaded structure from {args.structure}")
364 else:
365 console.print("[bold blue][Phase 1][/] Discovering files...")
366 json_headers = {
367 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}",
368 "Accept": "application/json;odata=verbose",
369 }
371 with (
372 Progress(
373 TextColumn("[progress.description]{task.description}"),
374 BarColumn(),
375 TaskProgressColumn(),
376 console=console,
377 ) as progress,
378 httpx.Client(headers=json_headers) as client,
379 ):
380 structure, folder_paths = extract_structure(client, site_url, folders, progress)
382 args.output_dir.mkdir(parents=True, exist_ok=True)
384 structure_file = args.output_dir / "structure.json"
385 output = {
386 "site_url": site_url,
387 "extracted_at": datetime.now(timezone.utc).isoformat(),
388 "structure": {name: node.to_dict() for name, node in structure.items()},
389 "folder_paths": folder_paths,
390 }
391 with structure_file.open("w") as f:
392 json.dump(output, f, indent=2, ensure_ascii=False)
393 console.print(f"Structure saved to {structure_file}")
395 if args.structure_only:
396 return
398 console.print("[bold blue][Phase 2][/] Downloading files...")
400 args.output_dir.mkdir(parents=True, exist_ok=True)
402 download_headers = {
403 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}",
404 }
406 with httpx.Client(headers=download_headers, timeout=300) as client:
407 download_all_files(client, site_url, structure, args.output_dir, folder_paths)
410if __name__ == "__main__": # pragma: no cover
411 main()