Coverage for src / piccione / download / from_sharepoint.py: 99%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-03-21 11:49 +0000

1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import json 

7import time 

8from contextlib import contextmanager 

9from datetime import datetime, timezone 

10from pathlib import Path 

11 

12import httpx 

13import yaml 

14from rich.console import Console 

15from rich.progress import ( 

16 BarColumn, 

17 Progress, 

18 TaskProgressColumn, 

19 TextColumn, 

20 TimeRemainingColumn, 

21) 

22 

23console = Console() 

24 

25 

26def load_config(config_path): 

27 with open(config_path) as f: 

28 return yaml.safe_load(f) 

29 

30 

31def get_site_relative_url(site_url): 

32 return "/" + "/".join(site_url.rstrip("/").split("/")[3:]) 

33 

34 

35def sort_structure(obj): 

36 if isinstance(obj, dict): 

37 sorted_dict = {} 

38 for key in sorted(obj.keys(), key=lambda k: (k == "_files", k)): 

39 sorted_dict[key] = sort_structure(obj[key]) 

40 return sorted_dict 

41 elif isinstance(obj, list): 

42 return sorted(obj) 

43 return obj 

44 

45 

46def request_with_retry(client, url, max_retries=5): # pragma: no cover 

47 for attempt in range(max_retries): 

48 resp = client.get(url) 

49 if resp.status_code == 429: 

50 wait_time = 2**attempt 

51 time.sleep(wait_time) 

52 continue 

53 resp.raise_for_status() 

54 return resp 

55 raise Exception(f"Rate limited after {max_retries} retries for {url}") 

56 

57 

58@contextmanager 

59def stream_with_retry(client, url, max_retries=5): # pragma: no cover 

60 for attempt in range(max_retries): 

61 with client.stream("GET", url) as resp: 

62 if resp.status_code == 429: 

63 wait_time = 2**attempt 

64 time.sleep(wait_time) 

65 continue 

66 if resp.status_code >= 400: 

67 resp.raise_for_status() 

68 yield resp 

69 return 

70 raise Exception(f"Rate limited after {max_retries} retries for {url}") 

71 

72 

73def get_folder_contents(client, site_url, folder_path): 

74 api_url = f"{site_url}/_api/web/GetFolderByServerRelativeUrl('{folder_path}')" 

75 

76 folders_resp = request_with_retry(client, f"{api_url}/Folders") 

77 folders_data = folders_resp.json()["d"]["results"] 

78 

79 files_resp = request_with_retry(client, f"{api_url}/Files") 

80 files_data = files_resp.json()["d"]["results"] 

81 

82 return folders_data, files_data 

83 

84 

85def get_folder_structure(client, site_url, folder_path): 

86 result = {} 

87 

88 folders, files = get_folder_contents(client, site_url, folder_path) 

89 

90 for folder in folders: 

91 name = folder["Name"] 

92 if name.startswith("_") or name == "Forms": 

93 continue 

94 subfolder_path = folder["ServerRelativeUrl"] 

95 result[name] = get_folder_structure(client, site_url, subfolder_path) 

96 

97 if files: 

98 result["_files"] = { 

99 f["Name"]: { 

100 "size": int(f["Length"]), 

101 "modified": f["TimeLastModified"], 

102 "etag": f["ETag"], 

103 } 

104 for f in files 

105 } 

106 

107 return result 

108 

109 

110def process_folder(client, folder_path, site_url, progress, task_id): 

111 folder_name = folder_path.split("/")[-1] 

112 progress.update(task_id, description=f"Scanning {folder_name}...") 

113 structure = get_folder_structure(client, site_url, folder_path) 

114 progress.advance(task_id) 

115 return folder_name, folder_path, structure 

116 

117 

118def extract_structure(client, site_url, folders, progress): 

119 site_relative_url = get_site_relative_url(site_url) 

120 

121 task_id = progress.add_task("Discovering...", total=len(folders)) 

122 

123 results = [] 

124 for folder in folders: 

125 normalized = folder if folder.startswith("/") else "/" + folder 

126 folder_path = site_relative_url + normalized 

127 result = process_folder(client, folder_path, site_url, progress, task_id) 

128 results.append(result) 

129 

130 structure = {name: folder_structure for name, _, folder_structure in results} 

131 folder_paths = {name: path for name, path, _ in results} 

132 return sort_structure(structure), folder_paths 

133 

134 

135def collect_files_from_structure(structure, folder_paths): 

136 files = [] 

137 

138 def traverse(node, current_path, base_server_path): 

139 for key, value in node.items(): 

140 if key == "_files": 

141 for filename, metadata in value.items(): 

142 server_path = f"{base_server_path}/{current_path}/{filename}" if current_path else f"{base_server_path}/{filename}" 

143 local_path = f"{current_path}/{filename}" if current_path else filename 

144 files.append((server_path, local_path, metadata)) 

145 elif isinstance(value, dict): 

146 new_path = f"{current_path}/{key}" if current_path else key 

147 traverse(value, new_path, base_server_path) 

148 

149 for folder_name, folder_structure in structure.items(): 

150 base_path = folder_paths[folder_name] 

151 traverse({folder_name: folder_structure}, "", base_path.rsplit("/", 1)[0]) 

152 

153 return files 

154 

155 

156def should_download(remote_meta, local_path): 

157 if not local_path.exists(): 

158 return True 

159 local_size = local_path.stat().st_size 

160 local_mtime = datetime.fromtimestamp(local_path.stat().st_mtime, tz=timezone.utc) 

161 remote_mtime = datetime.fromisoformat(remote_meta["modified"].replace("Z", "+00:00")) 

162 return local_size != remote_meta["size"] or local_mtime < remote_mtime 

163 

164 

165def download_file(client, site_url, file_server_relative_url, local_path): 

166 url = f"{site_url}/_api/web/GetFileByServerRelativeUrl('{file_server_relative_url}')/$value" 

167 

168 local_path.parent.mkdir(parents=True, exist_ok=True) 

169 

170 with stream_with_retry(client, url) as response: 

171 with open(local_path, "wb") as f: 

172 for chunk in response.iter_bytes(chunk_size=8192): 

173 f.write(chunk) 

174 

175 return local_path.stat().st_size 

176 

177 

178def collect_all_remote_paths(structure, folder_paths): 

179 return {Path(local_path) for _, local_path, _ in collect_files_from_structure(structure, folder_paths)} 

180 

181 

182def remove_orphans(output_dir, remote_paths): 

183 local_files = {p.relative_to(output_dir) for p in output_dir.rglob("*") if p.is_file() and p.name != "structure.json"} 

184 orphans = local_files - remote_paths 

185 for orphan in orphans: 

186 (output_dir / orphan).unlink() 

187 console.print(f"[yellow]Removed: {orphan}") 

188 return len(orphans) 

189 

190 

191def download_all_files(client, site_url, structure, output_dir, folder_paths): 

192 files = collect_files_from_structure(structure, folder_paths) 

193 total = len(files) 

194 

195 downloaded = 0 

196 updated = 0 

197 skipped = 0 

198 failed = 0 

199 

200 with Progress( 

201 TextColumn("[progress.description]{task.description}"), 

202 BarColumn(), 

203 TaskProgressColumn(), 

204 TimeRemainingColumn(), 

205 console=console, 

206 ) as progress: 

207 task_id = progress.add_task("Downloading...", total=total) 

208 

209 for server_path, local_rel_path, metadata in files: 

210 local_path = output_dir / local_rel_path 

211 progress.update(task_id, description=f"[cyan]{local_rel_path}") 

212 

213 if not should_download(metadata, local_path): 

214 skipped += 1 

215 progress.advance(task_id) 

216 continue 

217 

218 try: 

219 was_update = local_path.exists() 

220 download_file(client, site_url, server_path, local_path) 

221 if was_update: 

222 updated += 1 

223 else: 

224 downloaded += 1 

225 except Exception as e: 

226 console.print(f"[red]Failed: {local_rel_path} ({e})") 

227 failed += 1 

228 

229 progress.advance(task_id) 

230 

231 remote_paths = collect_all_remote_paths(structure, folder_paths) 

232 removed = remove_orphans(output_dir, remote_paths) 

233 

234 console.print(f"Downloaded: {downloaded}, Updated: {updated}, Skipped: {skipped}, Failed: {failed}, Removed: {removed}") 

235 

236 

237def main(): # pragma: no cover 

238 parser = argparse.ArgumentParser() 

239 parser.add_argument("config", type=Path) 

240 parser.add_argument("output_dir", type=Path) 

241 parser.add_argument("--structure-only", action="store_true") 

242 parser.add_argument("--structure", type=Path, help="Path to existing structure JSON file") 

243 args = parser.parse_args() 

244 

245 config = load_config(args.config) 

246 

247 site_url = config["site_url"] 

248 fedauth = config["fedauth"] 

249 rtfa = config["rtfa"] 

250 folders = config["folders"] 

251 

252 if args.structure: 

253 console.print("[bold blue][Phase 1][/] Loading structure from file...") 

254 with open(args.structure) as f: 

255 data = json.load(f) 

256 structure = data["structure"] 

257 folder_paths = data["folder_paths"] 

258 console.print(f"Loaded structure from {args.structure}") 

259 else: 

260 console.print("[bold blue][Phase 1][/] Discovering files...") 

261 json_headers = { 

262 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}", 

263 "Accept": "application/json;odata=verbose", 

264 } 

265 

266 with Progress( 

267 TextColumn("[progress.description]{task.description}"), 

268 BarColumn(), 

269 TaskProgressColumn(), 

270 console=console, 

271 ) as progress: 

272 with httpx.Client(headers=json_headers) as client: 

273 structure, folder_paths = extract_structure(client, site_url, folders, progress) 

274 

275 args.output_dir.mkdir(parents=True, exist_ok=True) 

276 

277 structure_file = args.output_dir / "structure.json" 

278 output = { 

279 "site_url": site_url, 

280 "extracted_at": datetime.now(timezone.utc).isoformat(), 

281 "structure": structure, 

282 "folder_paths": folder_paths, 

283 } 

284 with open(structure_file, "w") as f: 

285 json.dump(output, f, indent=2, ensure_ascii=False) 

286 console.print(f"Structure saved to {structure_file}") 

287 

288 if args.structure_only: 

289 return 

290 

291 console.print("[bold blue][Phase 2][/] Downloading files...") 

292 

293 args.output_dir.mkdir(parents=True, exist_ok=True) 

294 

295 download_headers = { 

296 "Cookie": f"FedAuth={fedauth}; rtFa={rtfa}", 

297 } 

298 

299 with httpx.Client(headers=download_headers, timeout=300) as client: 

300 download_all_files(client, site_url, structure, args.output_dir, folder_paths) 

301 

302 

303if __name__ == "__main__": # pragma: no cover 

304 main()