Coverage for src / piccione / download / from_sharepoint.py: 99%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-02-28 16:52 +0000

1import argparse 

2import json 

3import time 

4from contextlib import contextmanager 

5from datetime import datetime, timezone 

6from pathlib import Path 

7 

8import httpx 

9import yaml 

10from rich.console import Console 

11from rich.progress import ( 

12 BarColumn, 

13 Progress, 

14 TaskProgressColumn, 

15 TextColumn, 

16 TimeRemainingColumn, 

17) 

18 

19console = Console() 

20 

21 

22def load_config(config_path): 

23 with open(config_path) as f: 

24 return yaml.safe_load(f) 

25 

26 

27def get_site_relative_url(site_url): 

28 return "/" + "/".join(site_url.rstrip("/").split("/")[3:]) 

29 

30 

31def sort_structure(obj): 

32 if isinstance(obj, dict): 

33 sorted_dict = {} 

34 for key in sorted(obj.keys(), key=lambda k: (k == "_files", k)): 

35 sorted_dict[key] = sort_structure(obj[key]) 

36 return sorted_dict 

37 elif isinstance(obj, list): 

38 return sorted(obj) 

39 return obj 

40 

41 

42def request_with_retry(client, url, max_retries=5): # pragma: no cover 

43 for attempt in range(max_retries): 

44 resp = client.get(url) 

45 if resp.status_code == 429: 

46 wait_time = 2**attempt 

47 time.sleep(wait_time) 

48 continue 

49 resp.raise_for_status() 

50 return resp 

51 raise Exception(f"Rate limited after {max_retries} retries for {url}") 

52 

53 

54@contextmanager 

55def stream_with_retry(client, url, max_retries=5): # pragma: no cover 

56 for attempt in range(max_retries): 

57 with client.stream("GET", url) as resp: 

58 if resp.status_code == 429: 

59 wait_time = 2**attempt 

60 time.sleep(wait_time) 

61 continue 

62 if resp.status_code >= 400: 

63 resp.raise_for_status() 

64 yield resp 

65 return 

66 raise Exception(f"Rate limited after {max_retries} retries for {url}") 

67 

68 

69def get_folder_contents(client, site_url, folder_path): 

70 api_url = f"{site_url}/_api/web/GetFolderByServerRelativeUrl('{folder_path}')" 

71 

72 folders_resp = request_with_retry(client, f"{api_url}/Folders") 

73 folders_data = folders_resp.json()["d"]["results"] 

74 

75 files_resp = request_with_retry(client, f"{api_url}/Files") 

76 files_data = files_resp.json()["d"]["results"] 

77 

78 return folders_data, files_data 

79 

80 

81def get_folder_structure(client, site_url, folder_path): 

82 result = {} 

83 

84 folders, files = get_folder_contents(client, site_url, folder_path) 

85 

86 for folder in folders: 

87 name = folder["Name"] 

88 if name.startswith("_") or name == "Forms": 

89 continue 

90 subfolder_path = folder["ServerRelativeUrl"] 

91 result[name] = get_folder_structure(client, site_url, subfolder_path) 

92 

93 if files: 

94 result["_files"] = { 

95 f["Name"]: { 

96 "size": int(f["Length"]), 

97 "modified": f["TimeLastModified"], 

98 "etag": f["ETag"], 

99 } 

100 for f in files 

101 } 

102 

103 return result 

104 

105 

106def process_folder(client, folder_path, site_url, progress, task_id): 

107 folder_name = folder_path.split("/")[-1] 

108 progress.update(task_id, description=f"Scanning {folder_name}...") 

109 structure = get_folder_structure(client, site_url, folder_path) 

110 progress.advance(task_id) 

111 return folder_name, folder_path, structure 

112 

113 

114def extract_structure(client, site_url, folders, progress): 

115 site_relative_url = get_site_relative_url(site_url) 

116 

117 task_id = progress.add_task("Discovering...", total=len(folders)) 

118 

119 results = [] 

120 for folder in folders: 

121 normalized = folder if folder.startswith("/") else "/" + folder 

122 folder_path = site_relative_url + normalized 

123 result = process_folder(client, folder_path, site_url, progress, task_id) 

124 results.append(result) 

125 

126 structure = {name: folder_structure for name, _, folder_structure in results} 

127 folder_paths = {name: path for name, path, _ in results} 

128 return sort_structure(structure), folder_paths 

129 

130 

131def collect_files_from_structure(structure, folder_paths): 

132 files = [] 

133 

134 def traverse(node, current_path, base_server_path): 

135 for key, value in node.items(): 

136 if key == "_files": 

137 for filename, metadata in value.items(): 

138 server_path = f"{base_server_path}/{current_path}/{filename}" if current_path else f"{base_server_path}/{filename}" 

139 local_path = f"{current_path}/{filename}" if current_path else filename 

140 files.append((server_path, local_path, metadata)) 

141 elif isinstance(value, dict): 

142 new_path = f"{current_path}/{key}" if current_path else key 

143 traverse(value, new_path, base_server_path) 

144 

145 for folder_name, folder_structure in structure.items(): 

146 base_path = folder_paths[folder_name] 

147 traverse({folder_name: folder_structure}, "", base_path.rsplit("/", 1)[0]) 

148 

149 return files 

150 

151 

152def should_download(remote_meta, local_path): 

153 if not local_path.exists(): 

154 return True 

155 local_size = local_path.stat().st_size 

156 local_mtime = datetime.fromtimestamp(local_path.stat().st_mtime, tz=timezone.utc) 

157 remote_mtime = datetime.fromisoformat(remote_meta["modified"].replace("Z", "+00:00")) 

158 return local_size != remote_meta["size"] or local_mtime < remote_mtime 

159 

160 

161def download_file(client, site_url, file_server_relative_url, local_path): 

162 url = f"{site_url}/_api/web/GetFileByServerRelativeUrl('{file_server_relative_url}')/$value" 

163 

164 local_path.parent.mkdir(parents=True, exist_ok=True) 

165 

166 with stream_with_retry(client, url) as response: 

167 with open(local_path, "wb") as f: 

168 for chunk in response.iter_bytes(chunk_size=8192): 

169 f.write(chunk) 

170 

171 return local_path.stat().st_size 

172 

173 

174def collect_all_remote_paths(structure, folder_paths): 

175 return {Path(local_path) for _, local_path, _ in collect_files_from_structure(structure, folder_paths)} 

176 

177 

178def remove_orphans(output_dir, remote_paths): 

179 local_files = {p.relative_to(output_dir) for p in output_dir.rglob("*") if p.is_file() and p.name != "structure.json"} 

180 orphans = local_files - remote_paths 

181 for orphan in orphans: 

182 (output_dir / orphan).unlink() 

183 console.print(f"[yellow]Removed: {orphan}") 

184 return len(orphans) 

185 

186 

187def download_all_files(client, site_url, structure, output_dir, folder_paths): 

188 files = collect_files_from_structure(structure, folder_paths) 

189 total = len(files) 

190 

191 downloaded = 0 

192 updated = 0 

193 skipped = 0 

194 failed = 0 

195 

196 with Progress( 

197 TextColumn("[progress.description]{task.description}"), 

198 BarColumn(), 

199 TaskProgressColumn(), 

200 TimeRemainingColumn(), 

201 console=console, 

202 ) as progress: 

203 task_id = progress.add_task("Downloading...", total=total) 

204 

205 for server_path, local_rel_path, metadata in files: 

206 local_path = output_dir / local_rel_path 

207 progress.update(task_id, description=f"[cyan]{local_rel_path}") 

208 

209 if not should_download(metadata, local_path): 

210 skipped += 1 

211 progress.advance(task_id) 

212 continue 

213 

214 try: 

215 was_update = local_path.exists() 

216 download_file(client, site_url, server_path, local_path) 

217 if was_update: 

218 updated += 1 

219 else: 

220 downloaded += 1 

221 except Exception as e: 

222 console.print(f"[red]Failed: {local_rel_path} ({e})") 

223 failed += 1 

224 

225 progress.advance(task_id) 

226 

227 remote_paths = collect_all_remote_paths(structure, folder_paths) 

228 removed = remove_orphans(output_dir, remote_paths) 

229 

230 console.print(f"Downloaded: {downloaded}, Updated: {updated}, Skipped: {skipped}, Failed: {failed}, Removed: {removed}") 

231 

232 

233def main(): # pragma: no cover 

234 parser = argparse.ArgumentParser() 

235 parser.add_argument("config", type=Path) 

236 parser.add_argument("output_dir", type=Path) 

237 parser.add_argument("--structure-only", action="store_true") 

238 parser.add_argument("--structure", type=Path, help="Path to existing structure JSON file") 

239 args = parser.parse_args() 

240 

241 config = load_config(args.config) 

242 

243 site_url = config["site_url"] 

244 fedauth = config["fedauth"] 

245 rtfa = config["rtfa"] 

246 folders = config["folders"] 

247 

248 if args.structure: 

249 console.print("[bold blue][Phase 1][/] Loading structure from file...") 

250 with open(args.structure) as f: 

251 data = json.load(f) 

252 structure = data["structure"] 

253 folder_paths = data["folder_paths"] 

254 console.print(f"Loaded structure from {args.structure}") 

255 else: 

256 console.print("[bold blue][Phase 1][/] Discovering files...") 

257 json_headers = { 

258 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}", 

259 "Accept": "application/json;odata=verbose", 

260 } 

261 

262 with Progress( 

263 TextColumn("[progress.description]{task.description}"), 

264 BarColumn(), 

265 TaskProgressColumn(), 

266 console=console, 

267 ) as progress: 

268 with httpx.Client(headers=json_headers) as client: 

269 structure, folder_paths = extract_structure(client, site_url, folders, progress) 

270 

271 args.output_dir.mkdir(parents=True, exist_ok=True) 

272 

273 structure_file = args.output_dir / "structure.json" 

274 output = { 

275 "site_url": site_url, 

276 "extracted_at": datetime.now(timezone.utc).isoformat(), 

277 "structure": structure, 

278 "folder_paths": folder_paths, 

279 } 

280 with open(structure_file, "w") as f: 

281 json.dump(output, f, indent=2, ensure_ascii=False) 

282 console.print(f"Structure saved to {structure_file}") 

283 

284 if args.structure_only: 

285 return 

286 

287 console.print("[bold blue][Phase 2][/] Downloading files...") 

288 

289 args.output_dir.mkdir(parents=True, exist_ok=True) 

290 

291 download_headers = { 

292 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}", 

293 } 

294 

295 with httpx.Client(headers=download_headers, timeout=300) as client: 

296 download_all_files(client, site_url, structure, args.output_dir, folder_paths) 

297 

298 

299if __name__ == "__main__": # pragma: no cover 

300 main()