Coverage for src / piccione / download / from_sharepoint.py: 87%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-27 20:21 +0000

1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5from __future__ import annotations 

6 

7import argparse 

8import json 

9import time 

10from contextlib import contextmanager 

11from dataclasses import dataclass, field 

12from datetime import datetime, timezone 

13from pathlib import Path 

14from typing import TYPE_CHECKING, TypedDict, cast 

15 

16if TYPE_CHECKING: 

17 from collections.abc import Generator 

18 

19import httpx 

20import yaml 

21from rich.console import Console 

22from rich.progress import ( 

23 BarColumn, 

24 Progress, 

25 TaskID, 

26 TaskProgressColumn, 

27 TextColumn, 

28 TimeRemainingColumn, 

29) 

30 

31console = Console() 

32 

33 

34class SharePointFolder(TypedDict): 

35 Name: str 

36 ServerRelativeUrl: str 

37 

38 

39class SharePointFile(TypedDict): 

40 Name: str 

41 Length: str 

42 TimeLastModified: str 

43 ETag: str 

44 

45 

46class SharePointConfig(TypedDict): 

47 site_url: str 

48 fedauth: str 

49 rtfa: str 

50 folders: list[str] 

51 

52 

53@dataclass 

54class FileMetadata: 

55 size: int 

56 modified: str 

57 etag: str 

58 

59 

60@dataclass 

61class FolderNode: 

62 subfolders: dict[str, FolderNode] = field(default_factory=dict) 

63 files: dict[str, FileMetadata] = field(default_factory=dict) 

64 

65 def to_dict(self) -> dict[str, object]: 

66 result: dict[str, object] = {} 

67 for name, child in self.subfolders.items(): 

68 result[name] = child.to_dict() 

69 if self.files: 

70 result["_files"] = { 

71 name: {"size": meta.size, "modified": meta.modified, "etag": meta.etag} 

72 for name, meta in self.files.items() 

73 } 

74 return result 

75 

76 @staticmethod 

77 def from_dict(data: dict[str, object]) -> FolderNode: 

78 node = FolderNode() 

79 for key, value in data.items(): 

80 if not isinstance(value, dict): 

81 msg = f"Expected dict for key {key!r}, got {type(value).__name__}" 

82 raise TypeError(msg) 

83 if key == "_files": 

84 for filename, meta_raw in value.items(): 

85 if not isinstance(meta_raw, dict): 

86 msg = f"Expected dict for file {filename!r}, got {type(meta_raw).__name__}" 

87 raise TypeError(msg) 

88 size, modified, etag = meta_raw["size"], meta_raw["modified"], meta_raw["etag"] 

89 if not isinstance(size, int): 

90 msg = f"Expected int for size of {filename!r}, got {type(size).__name__}" 

91 raise TypeError(msg) 

92 if not isinstance(modified, str): 

93 msg = f"Expected str for modified of {filename!r}, got {type(modified).__name__}" 

94 raise TypeError(msg) 

95 if not isinstance(etag, str): 

96 msg = f"Expected str for etag of {filename!r}, got {type(etag).__name__}" 

97 raise TypeError(msg) 

98 node.files[filename] = FileMetadata(size=size, modified=modified, etag=etag) 

99 else: 

100 node.subfolders[key] = FolderNode.from_dict(value) 

101 return node 

102 

103 

104def load_config(config_path: str | Path) -> SharePointConfig: 

105 with Path(config_path).open() as f: 

106 return cast("SharePointConfig", yaml.safe_load(f)) 

107 

108 

109def get_site_relative_url(site_url: str) -> str: 

110 return "/" + "/".join(site_url.rstrip("/").split("/")[3:]) 

111 

112 

113def sort_structure(node: FolderNode) -> FolderNode: 

114 sorted_subfolders = {k: sort_structure(v) for k, v in sorted(node.subfolders.items())} 

115 sorted_files = dict(sorted(node.files.items())) 

116 return FolderNode(subfolders=sorted_subfolders, files=sorted_files) 

117 

118 

119HTTP_TOO_MANY_REQUESTS = 429 

120HTTP_BAD_REQUEST = 400 

121 

122 

123def request_with_retry(client: httpx.Client, url: str, max_retries: int = 5) -> httpx.Response: # pragma: no cover 

124 for attempt in range(max_retries): 

125 resp = client.get(url) 

126 if resp.status_code == HTTP_TOO_MANY_REQUESTS: 

127 wait_time = 2**attempt 

128 time.sleep(wait_time) 

129 continue 

130 resp.raise_for_status() 

131 return resp 

132 msg = f"Rate limited after {max_retries} retries for {url}" 

133 raise RuntimeError(msg) 

134 

135 

136@contextmanager 

137def stream_with_retry( # pragma: no cover 

138 client: httpx.Client, 

139 url: str, 

140 max_retries: int = 5, 

141) -> Generator[httpx.Response, None, None]: 

142 for attempt in range(max_retries): 

143 with client.stream("GET", url) as resp: 

144 if resp.status_code == HTTP_TOO_MANY_REQUESTS: 

145 wait_time = 2**attempt 

146 time.sleep(wait_time) 

147 continue 

148 if resp.status_code >= HTTP_BAD_REQUEST: 

149 resp.raise_for_status() 

150 yield resp 

151 return 

152 msg = f"Rate limited after {max_retries} retries for {url}" 

153 raise RuntimeError(msg) 

154 

155 

156def get_folder_contents( 

157 client: httpx.Client, 

158 site_url: str, 

159 folder_path: str, 

160) -> tuple[list[SharePointFolder], list[SharePointFile]]: 

161 api_url = f"{site_url}/_api/web/GetFolderByServerRelativeUrl('{folder_path}')" 

162 

163 folders_resp = request_with_retry(client, f"{api_url}/Folders") 

164 folders_data = folders_resp.json()["d"]["results"] 

165 

166 files_resp = request_with_retry(client, f"{api_url}/Files") 

167 files_data = files_resp.json()["d"]["results"] 

168 

169 return folders_data, files_data 

170 

171 

172def get_folder_structure(client: httpx.Client, site_url: str, folder_path: str) -> FolderNode: 

173 node = FolderNode() 

174 

175 folders, files = get_folder_contents(client, site_url, folder_path) 

176 

177 for folder in folders: 

178 name = folder["Name"] 

179 if name.startswith("_") or name == "Forms": 

180 continue 

181 node.subfolders[name] = get_folder_structure(client, site_url, folder["ServerRelativeUrl"]) 

182 

183 for f in files: 

184 node.files[f["Name"]] = FileMetadata( 

185 size=int(f["Length"]), 

186 modified=f["TimeLastModified"], 

187 etag=f["ETag"], 

188 ) 

189 

190 return node 

191 

192 

193def process_folder( 

194 client: httpx.Client, 

195 folder_path: str, 

196 site_url: str, 

197 progress: Progress, 

198 task_id: TaskID, 

199) -> tuple[str, str, FolderNode]: 

200 folder_name = folder_path.rsplit("/", maxsplit=1)[-1] 

201 progress.update(task_id, description=f"Scanning {folder_name}...") 

202 structure = get_folder_structure(client, site_url, folder_path) 

203 progress.advance(task_id) 

204 return folder_name, folder_path, structure 

205 

206 

207def extract_structure( 

208 client: httpx.Client, 

209 site_url: str, 

210 folders: list[str], 

211 progress: Progress, 

212) -> tuple[dict[str, FolderNode], dict[str, str]]: 

213 site_relative_url = get_site_relative_url(site_url) 

214 

215 task_id = progress.add_task("Discovering...", total=len(folders)) 

216 

217 results = [] 

218 for folder in folders: 

219 normalized = folder if folder.startswith("/") else "/" + folder 

220 folder_path = site_relative_url + normalized 

221 result = process_folder(client, folder_path, site_url, progress, task_id) 

222 results.append(result) 

223 

224 structure = {name: sort_structure(folder_structure) for name, _, folder_structure in sorted(results)} 

225 folder_paths = {name: path for name, path, _ in results} 

226 return structure, folder_paths 

227 

228 

229def collect_files_from_structure( 

230 structure: dict[str, FolderNode], 

231 folder_paths: dict[str, str], 

232) -> list[tuple[str, str, FileMetadata]]: 

233 files: list[tuple[str, str, FileMetadata]] = [] 

234 

235 def traverse(node: FolderNode, current_path: str, base_server_path: str) -> None: 

236 for filename, metadata in node.files.items(): 

237 server_path = ( 

238 f"{base_server_path}/{current_path}/{filename}" if current_path else f"{base_server_path}/{filename}" 

239 ) 

240 local_path = f"{current_path}/{filename}" if current_path else filename 

241 files.append((server_path, local_path, metadata)) 

242 for name, child in node.subfolders.items(): 

243 new_path = f"{current_path}/{name}" if current_path else name 

244 traverse(child, new_path, base_server_path) 

245 

246 for folder_name, folder_node in structure.items(): 

247 base_path = folder_paths[folder_name] 

248 traverse(folder_node, folder_name, base_path.rsplit("/", 1)[0]) 

249 

250 return files 

251 

252 

253def should_download(remote_meta: FileMetadata, local_path: Path) -> bool: 

254 if not local_path.exists(): 

255 return True 

256 local_size = local_path.stat().st_size 

257 local_mtime = datetime.fromtimestamp(local_path.stat().st_mtime, tz=timezone.utc) 

258 remote_mtime = datetime.fromisoformat(remote_meta.modified.replace("Z", "+00:00")) 

259 return local_size != remote_meta.size or local_mtime < remote_mtime 

260 

261 

262def download_file(client: httpx.Client, site_url: str, file_server_relative_url: str, local_path: Path) -> int: 

263 url = f"{site_url}/_api/web/GetFileByServerRelativeUrl('{file_server_relative_url}')/$value" 

264 

265 local_path.parent.mkdir(parents=True, exist_ok=True) 

266 

267 with stream_with_retry(client, url) as response, local_path.open("wb") as f: 

268 f.writelines(response.iter_bytes(chunk_size=8192)) 

269 

270 return local_path.stat().st_size 

271 

272 

273def collect_all_remote_paths(structure: dict[str, FolderNode], folder_paths: dict[str, str]) -> set[Path]: 

274 return {Path(local_path) for _, local_path, _ in collect_files_from_structure(structure, folder_paths)} 

275 

276 

277def remove_orphans(output_dir: Path, remote_paths: set[Path]) -> int: 

278 local_files = { 

279 p.relative_to(output_dir) for p in output_dir.rglob("*") if p.is_file() and p.name != "structure.json" 

280 } 

281 orphans = local_files - remote_paths 

282 for orphan in orphans: 

283 (output_dir / orphan).unlink() 

284 console.print(f"[yellow]Removed: {orphan}") 

285 return len(orphans) 

286 

287 

288def download_all_files( 

289 client: httpx.Client, 

290 site_url: str, 

291 structure: dict[str, FolderNode], 

292 output_dir: Path, 

293 folder_paths: dict[str, str], 

294) -> None: 

295 files = collect_files_from_structure(structure, folder_paths) 

296 total = len(files) 

297 

298 downloaded = 0 

299 updated = 0 

300 skipped = 0 

301 failed = 0 

302 

303 with Progress( 

304 TextColumn("[progress.description]{task.description}"), 

305 BarColumn(), 

306 TaskProgressColumn(), 

307 TimeRemainingColumn(), 

308 console=console, 

309 ) as progress: 

310 task_id = progress.add_task("Downloading...", total=total) 

311 

312 for server_path, local_rel_path, metadata in files: 

313 local_path = output_dir / local_rel_path 

314 progress.update(task_id, description=f"[cyan]{local_rel_path}") 

315 

316 if not should_download(metadata, local_path): 

317 skipped += 1 

318 progress.advance(task_id) 

319 continue 

320 

321 try: 

322 was_update = local_path.exists() 

323 download_file(client, site_url, server_path, local_path) 

324 if was_update: 

325 updated += 1 

326 else: 

327 downloaded += 1 

328 except Exception as e: # noqa: BLE001 

329 console.print(f"[red]Failed: {local_rel_path} ({e})") 

330 failed += 1 

331 

332 progress.advance(task_id) 

333 

334 remote_paths = collect_all_remote_paths(structure, folder_paths) 

335 removed = remove_orphans(output_dir, remote_paths) 

336 

337 console.print( 

338 f"Downloaded: {downloaded}, Updated: {updated}, Skipped: {skipped}, Failed: {failed}, Removed: {removed}", 

339 ) 

340 

341 

342def main() -> None: # pragma: no cover 

343 parser = argparse.ArgumentParser() 

344 parser.add_argument("config", type=Path) 

345 parser.add_argument("output_dir", type=Path) 

346 parser.add_argument("--structure-only", action="store_true") 

347 parser.add_argument("--structure", type=Path, help="Path to existing structure JSON file") 

348 args = parser.parse_args() 

349 

350 config = load_config(args.config) 

351 

352 site_url = config["site_url"] 

353 fedauth = config["fedauth"] 

354 rtfa = config["rtfa"] 

355 folders = config["folders"] 

356 

357 if args.structure: 

358 console.print("[bold blue][Phase 1][/] Loading structure from file...") 

359 with args.structure.open() as f: 

360 data = json.load(f) 

361 structure = {name: FolderNode.from_dict(d) for name, d in data["structure"].items()} 

362 folder_paths = data["folder_paths"] 

363 console.print(f"Loaded structure from {args.structure}") 

364 else: 

365 console.print("[bold blue][Phase 1][/] Discovering files...") 

366 json_headers = { 

367 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}", 

368 "Accept": "application/json;odata=verbose", 

369 } 

370 

371 with ( 

372 Progress( 

373 TextColumn("[progress.description]{task.description}"), 

374 BarColumn(), 

375 TaskProgressColumn(), 

376 console=console, 

377 ) as progress, 

378 httpx.Client(headers=json_headers) as client, 

379 ): 

380 structure, folder_paths = extract_structure(client, site_url, folders, progress) 

381 

382 args.output_dir.mkdir(parents=True, exist_ok=True) 

383 

384 structure_file = args.output_dir / "structure.json" 

385 output = { 

386 "site_url": site_url, 

387 "extracted_at": datetime.now(timezone.utc).isoformat(), 

388 "structure": {name: node.to_dict() for name, node in structure.items()}, 

389 "folder_paths": folder_paths, 

390 } 

391 with structure_file.open("w") as f: 

392 json.dump(output, f, indent=2, ensure_ascii=False) 

393 console.print(f"Structure saved to {structure_file}") 

394 

395 if args.structure_only: 

396 return 

397 

398 console.print("[bold blue][Phase 2][/] Downloading files...") 

399 

400 args.output_dir.mkdir(parents=True, exist_ok=True) 

401 

402 download_headers = { 

403 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}", 

404 } 

405 

406 with httpx.Client(headers=download_headers, timeout=300) as client: 

407 download_all_files(client, site_url, structure, args.output_dir, folder_paths) 

408 

409 

410if __name__ == "__main__": # pragma: no cover 

411 main()