Coverage for src / piccione / download / from_sharepoint.py: 100%
110 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 13:41 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 13:41 +0000
1import argparse
2import json
3import time
4from contextlib import contextmanager
5from datetime import datetime, timezone
6from pathlib import Path
8import httpx
9import yaml
10from rich.console import Console
11from rich.progress import (
12 BarColumn,
13 Progress,
14 TaskProgressColumn,
15 TextColumn,
16 TimeRemainingColumn,
17)
19console = Console()
22def load_config(config_path):
23 with open(config_path) as f:
24 return yaml.safe_load(f)
27def get_site_relative_url(site_url):
28 return "/" + "/".join(site_url.rstrip("/").split("/")[3:])
31def sort_structure(obj):
32 if isinstance(obj, dict):
33 sorted_dict = {}
34 for key in sorted(obj.keys(), key=lambda k: (k == "_files", k)):
35 sorted_dict[key] = sort_structure(obj[key])
36 return sorted_dict
37 elif isinstance(obj, list):
38 return sorted(obj)
39 return obj
42def request_with_retry(client, url, max_retries=5): # pragma: no cover
43 for attempt in range(max_retries):
44 resp = client.get(url)
45 if resp.status_code == 429:
46 wait_time = 2**attempt
47 time.sleep(wait_time)
48 continue
49 resp.raise_for_status()
50 return resp
51 raise Exception(f"Rate limited after {max_retries} retries for {url}")
54@contextmanager
55def stream_with_retry(client, url, max_retries=5): # pragma: no cover
56 for attempt in range(max_retries):
57 with client.stream("GET", url) as resp:
58 if resp.status_code == 429:
59 wait_time = 2**attempt
60 time.sleep(wait_time)
61 continue
62 if resp.status_code >= 400:
63 resp.raise_for_status()
64 yield resp
65 return
66 raise Exception(f"Rate limited after {max_retries} retries for {url}")
69def get_folder_contents(client, site_url, folder_path):
70 api_url = f"{site_url}/_api/web/GetFolderByServerRelativeUrl('{folder_path}')"
72 folders_resp = request_with_retry(client, f"{api_url}/Folders")
73 folders_data = folders_resp.json()["d"]["results"]
75 files_resp = request_with_retry(client, f"{api_url}/Files")
76 files_data = files_resp.json()["d"]["results"]
78 return folders_data, files_data
81def get_folder_structure(client, site_url, folder_path):
82 result = {}
84 folders, files = get_folder_contents(client, site_url, folder_path)
86 for folder in folders:
87 name = folder["Name"]
88 if name.startswith("_") or name == "Forms":
89 continue
90 subfolder_path = folder["ServerRelativeUrl"]
91 result[name] = get_folder_structure(client, site_url, subfolder_path)
93 file_list = [f["Name"] for f in files]
94 if file_list:
95 result["_files"] = file_list
97 return result
100def process_folder(client, folder_path, site_url, progress, task_id):
101 folder_name = folder_path.split("/")[-1]
102 progress.update(task_id, description=f"Scanning {folder_name}...")
103 structure = get_folder_structure(client, site_url, folder_path)
104 progress.advance(task_id)
105 return folder_name, folder_path, structure
108def extract_structure(client, site_url, folders, progress):
109 site_relative_url = get_site_relative_url(site_url)
111 task_id = progress.add_task("Discovering...", total=len(folders))
113 results = []
114 for folder in folders:
115 normalized = folder if folder.startswith("/") else "/" + folder
116 folder_path = site_relative_url + normalized
117 result = process_folder(client, folder_path, site_url, progress, task_id)
118 results.append(result)
120 structure = {name: folder_structure for name, _, folder_structure in results}
121 folder_paths = {name: path for name, path, _ in results}
122 return sort_structure(structure), folder_paths
125def collect_files_from_structure(structure, folder_paths):
126 files = []
128 def traverse(node, current_path, base_server_path):
129 for key, value in node.items():
130 if key == "_files":
131 for filename in value:
132 server_path = f"{base_server_path}/{current_path}/{filename}" if current_path else f"{base_server_path}/{filename}"
133 local_path = f"{current_path}/{filename}" if current_path else filename
134 files.append((server_path, local_path))
135 elif isinstance(value, dict):
136 new_path = f"{current_path}/{key}" if current_path else key
137 traverse(value, new_path, base_server_path)
139 for folder_name, folder_structure in structure.items():
140 base_path = folder_paths[folder_name]
141 traverse({folder_name: folder_structure}, "", base_path.rsplit("/", 1)[0])
143 return files
146def download_file(client, site_url, file_server_relative_url, local_path):
147 url = f"{site_url}/_api/web/GetFileByServerRelativeUrl('{file_server_relative_url}')/$value"
149 local_path.parent.mkdir(parents=True, exist_ok=True)
151 with stream_with_retry(client, url) as response:
152 with open(local_path, "wb") as f:
153 for chunk in response.iter_bytes(chunk_size=8192):
154 f.write(chunk)
156 return local_path.stat().st_size
159def download_all_files(client, site_url, structure, output_dir, folder_paths):
160 files = collect_files_from_structure(structure, folder_paths)
161 total = len(files)
163 downloaded = 0
164 skipped = 0
165 failed = 0
167 with Progress(
168 TextColumn("[progress.description]{task.description}"),
169 BarColumn(),
170 TaskProgressColumn(),
171 TimeRemainingColumn(),
172 console=console,
173 ) as progress:
174 task_id = progress.add_task("Downloading...", total=total)
176 for server_path, local_rel_path in files:
177 local_path = output_dir / local_rel_path
178 progress.update(task_id, description=f"[cyan]{local_rel_path}")
180 if local_path.exists():
181 skipped += 1
182 progress.advance(task_id)
183 continue
185 try:
186 download_file(client, site_url, server_path, local_path)
187 downloaded += 1
188 except Exception as e:
189 console.print(f"[red]Failed: {local_rel_path} ({e})")
190 failed += 1
192 progress.advance(task_id)
194 console.print(f"Downloaded: {downloaded}, Skipped: {skipped}, Failed: {failed}")
197def main(): # pragma: no cover
198 parser = argparse.ArgumentParser()
199 parser.add_argument("config", type=Path)
200 parser.add_argument("output_dir", type=Path)
201 parser.add_argument("--structure-only", action="store_true")
202 parser.add_argument("--structure", type=Path, help="Path to existing structure JSON file")
203 args = parser.parse_args()
205 config = load_config(args.config)
207 site_url = config["site_url"]
208 fedauth = config["fedauth"]
209 rtfa = config["rtfa"]
210 folders = config["folders"]
212 if args.structure:
213 console.print("[bold blue][Phase 1][/] Loading structure from file...")
214 with open(args.structure) as f:
215 data = json.load(f)
216 structure = data["structure"]
217 folder_paths = data["folder_paths"]
218 console.print(f"Loaded structure from {args.structure}")
219 else:
220 console.print("[bold blue][Phase 1][/] Discovering files...")
221 json_headers = {
222 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}",
223 "Accept": "application/json;odata=verbose",
224 }
226 with Progress(
227 TextColumn("[progress.description]{task.description}"),
228 BarColumn(),
229 TaskProgressColumn(),
230 console=console,
231 ) as progress:
232 with httpx.Client(headers=json_headers) as client:
233 structure, folder_paths = extract_structure(client, site_url, folders, progress)
235 args.output_dir.mkdir(parents=True, exist_ok=True)
237 structure_file = args.output_dir / "structure.json"
238 output = {
239 "site_url": site_url,
240 "extracted_at": datetime.now(timezone.utc).isoformat(),
241 "structure": structure,
242 "folder_paths": folder_paths,
243 }
244 with open(structure_file, "w") as f:
245 json.dump(output, f, indent=2, ensure_ascii=False)
246 console.print(f"Structure saved to {structure_file}")
248 if args.structure_only:
249 return
251 console.print("[bold blue][Phase 2][/] Downloading files...")
253 args.output_dir.mkdir(parents=True, exist_ok=True)
255 download_headers = {
256 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}",
257 }
259 with httpx.Client(headers=download_headers, timeout=300) as client:
260 download_all_files(client, site_url, structure, args.output_dir, folder_paths)
263if __name__ == "__main__": # pragma: no cover
264 main()