Coverage for src / piccione / download / from_sharepoint.py: 99%
131 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-21 11:49 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-03-21 11:49 +0000
1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import json
7import time
8from contextlib import contextmanager
9from datetime import datetime, timezone
10from pathlib import Path
12import httpx
13import yaml
14from rich.console import Console
15from rich.progress import (
16 BarColumn,
17 Progress,
18 TaskProgressColumn,
19 TextColumn,
20 TimeRemainingColumn,
21)
23console = Console()
26def load_config(config_path):
27 with open(config_path) as f:
28 return yaml.safe_load(f)
31def get_site_relative_url(site_url):
32 return "/" + "/".join(site_url.rstrip("/").split("/")[3:])
35def sort_structure(obj):
36 if isinstance(obj, dict):
37 sorted_dict = {}
38 for key in sorted(obj.keys(), key=lambda k: (k == "_files", k)):
39 sorted_dict[key] = sort_structure(obj[key])
40 return sorted_dict
41 elif isinstance(obj, list):
42 return sorted(obj)
43 return obj
46def request_with_retry(client, url, max_retries=5): # pragma: no cover
47 for attempt in range(max_retries):
48 resp = client.get(url)
49 if resp.status_code == 429:
50 wait_time = 2**attempt
51 time.sleep(wait_time)
52 continue
53 resp.raise_for_status()
54 return resp
55 raise Exception(f"Rate limited after {max_retries} retries for {url}")
58@contextmanager
59def stream_with_retry(client, url, max_retries=5): # pragma: no cover
60 for attempt in range(max_retries):
61 with client.stream("GET", url) as resp:
62 if resp.status_code == 429:
63 wait_time = 2**attempt
64 time.sleep(wait_time)
65 continue
66 if resp.status_code >= 400:
67 resp.raise_for_status()
68 yield resp
69 return
70 raise Exception(f"Rate limited after {max_retries} retries for {url}")
73def get_folder_contents(client, site_url, folder_path):
74 api_url = f"{site_url}/_api/web/GetFolderByServerRelativeUrl('{folder_path}')"
76 folders_resp = request_with_retry(client, f"{api_url}/Folders")
77 folders_data = folders_resp.json()["d"]["results"]
79 files_resp = request_with_retry(client, f"{api_url}/Files")
80 files_data = files_resp.json()["d"]["results"]
82 return folders_data, files_data
85def get_folder_structure(client, site_url, folder_path):
86 result = {}
88 folders, files = get_folder_contents(client, site_url, folder_path)
90 for folder in folders:
91 name = folder["Name"]
92 if name.startswith("_") or name == "Forms":
93 continue
94 subfolder_path = folder["ServerRelativeUrl"]
95 result[name] = get_folder_structure(client, site_url, subfolder_path)
97 if files:
98 result["_files"] = {
99 f["Name"]: {
100 "size": int(f["Length"]),
101 "modified": f["TimeLastModified"],
102 "etag": f["ETag"],
103 }
104 for f in files
105 }
107 return result
110def process_folder(client, folder_path, site_url, progress, task_id):
111 folder_name = folder_path.split("/")[-1]
112 progress.update(task_id, description=f"Scanning {folder_name}...")
113 structure = get_folder_structure(client, site_url, folder_path)
114 progress.advance(task_id)
115 return folder_name, folder_path, structure
118def extract_structure(client, site_url, folders, progress):
119 site_relative_url = get_site_relative_url(site_url)
121 task_id = progress.add_task("Discovering...", total=len(folders))
123 results = []
124 for folder in folders:
125 normalized = folder if folder.startswith("/") else "/" + folder
126 folder_path = site_relative_url + normalized
127 result = process_folder(client, folder_path, site_url, progress, task_id)
128 results.append(result)
130 structure = {name: folder_structure for name, _, folder_structure in results}
131 folder_paths = {name: path for name, path, _ in results}
132 return sort_structure(structure), folder_paths
135def collect_files_from_structure(structure, folder_paths):
136 files = []
138 def traverse(node, current_path, base_server_path):
139 for key, value in node.items():
140 if key == "_files":
141 for filename, metadata in value.items():
142 server_path = f"{base_server_path}/{current_path}/{filename}" if current_path else f"{base_server_path}/{filename}"
143 local_path = f"{current_path}/{filename}" if current_path else filename
144 files.append((server_path, local_path, metadata))
145 elif isinstance(value, dict):
146 new_path = f"{current_path}/{key}" if current_path else key
147 traverse(value, new_path, base_server_path)
149 for folder_name, folder_structure in structure.items():
150 base_path = folder_paths[folder_name]
151 traverse({folder_name: folder_structure}, "", base_path.rsplit("/", 1)[0])
153 return files
156def should_download(remote_meta, local_path):
157 if not local_path.exists():
158 return True
159 local_size = local_path.stat().st_size
160 local_mtime = datetime.fromtimestamp(local_path.stat().st_mtime, tz=timezone.utc)
161 remote_mtime = datetime.fromisoformat(remote_meta["modified"].replace("Z", "+00:00"))
162 return local_size != remote_meta["size"] or local_mtime < remote_mtime
165def download_file(client, site_url, file_server_relative_url, local_path):
166 url = f"{site_url}/_api/web/GetFileByServerRelativeUrl('{file_server_relative_url}')/$value"
168 local_path.parent.mkdir(parents=True, exist_ok=True)
170 with stream_with_retry(client, url) as response:
171 with open(local_path, "wb") as f:
172 for chunk in response.iter_bytes(chunk_size=8192):
173 f.write(chunk)
175 return local_path.stat().st_size
178def collect_all_remote_paths(structure, folder_paths):
179 return {Path(local_path) for _, local_path, _ in collect_files_from_structure(structure, folder_paths)}
182def remove_orphans(output_dir, remote_paths):
183 local_files = {p.relative_to(output_dir) for p in output_dir.rglob("*") if p.is_file() and p.name != "structure.json"}
184 orphans = local_files - remote_paths
185 for orphan in orphans:
186 (output_dir / orphan).unlink()
187 console.print(f"[yellow]Removed: {orphan}")
188 return len(orphans)
191def download_all_files(client, site_url, structure, output_dir, folder_paths):
192 files = collect_files_from_structure(structure, folder_paths)
193 total = len(files)
195 downloaded = 0
196 updated = 0
197 skipped = 0
198 failed = 0
200 with Progress(
201 TextColumn("[progress.description]{task.description}"),
202 BarColumn(),
203 TaskProgressColumn(),
204 TimeRemainingColumn(),
205 console=console,
206 ) as progress:
207 task_id = progress.add_task("Downloading...", total=total)
209 for server_path, local_rel_path, metadata in files:
210 local_path = output_dir / local_rel_path
211 progress.update(task_id, description=f"[cyan]{local_rel_path}")
213 if not should_download(metadata, local_path):
214 skipped += 1
215 progress.advance(task_id)
216 continue
218 try:
219 was_update = local_path.exists()
220 download_file(client, site_url, server_path, local_path)
221 if was_update:
222 updated += 1
223 else:
224 downloaded += 1
225 except Exception as e:
226 console.print(f"[red]Failed: {local_rel_path} ({e})")
227 failed += 1
229 progress.advance(task_id)
231 remote_paths = collect_all_remote_paths(structure, folder_paths)
232 removed = remove_orphans(output_dir, remote_paths)
234 console.print(f"Downloaded: {downloaded}, Updated: {updated}, Skipped: {skipped}, Failed: {failed}, Removed: {removed}")
237def main(): # pragma: no cover
238 parser = argparse.ArgumentParser()
239 parser.add_argument("config", type=Path)
240 parser.add_argument("output_dir", type=Path)
241 parser.add_argument("--structure-only", action="store_true")
242 parser.add_argument("--structure", type=Path, help="Path to existing structure JSON file")
243 args = parser.parse_args()
245 config = load_config(args.config)
247 site_url = config["site_url"]
248 fedauth = config["fedauth"]
249 rtfa = config["rtfa"]
250 folders = config["folders"]
252 if args.structure:
253 console.print("[bold blue][Phase 1][/] Loading structure from file...")
254 with open(args.structure) as f:
255 data = json.load(f)
256 structure = data["structure"]
257 folder_paths = data["folder_paths"]
258 console.print(f"Loaded structure from {args.structure}")
259 else:
260 console.print("[bold blue][Phase 1][/] Discovering files...")
261 json_headers = {
262 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}",
263 "Accept": "application/json;odata=verbose",
264 }
266 with Progress(
267 TextColumn("[progress.description]{task.description}"),
268 BarColumn(),
269 TaskProgressColumn(),
270 console=console,
271 ) as progress:
272 with httpx.Client(headers=json_headers) as client:
273 structure, folder_paths = extract_structure(client, site_url, folders, progress)
275 args.output_dir.mkdir(parents=True, exist_ok=True)
277 structure_file = args.output_dir / "structure.json"
278 output = {
279 "site_url": site_url,
280 "extracted_at": datetime.now(timezone.utc).isoformat(),
281 "structure": structure,
282 "folder_paths": folder_paths,
283 }
284 with open(structure_file, "w") as f:
285 json.dump(output, f, indent=2, ensure_ascii=False)
286 console.print(f"Structure saved to {structure_file}")
288 if args.structure_only:
289 return
291 console.print("[bold blue][Phase 2][/] Downloading files...")
293 args.output_dir.mkdir(parents=True, exist_ok=True)
295 download_headers = {
296 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}",
297 }
299 with httpx.Client(headers=download_headers, timeout=300) as client:
300 download_all_files(client, site_url, structure, args.output_dir, folder_paths)
303if __name__ == "__main__": # pragma: no cover
304 main()