(key: str, code: str, ip: str = Depends(ip_limit["error"]))
| 304 | |
| 305 | @share_api.get("/download") |
| 306 | async def download_file(key: str, code: str, ip: str = Depends(ip_limit["error"])): |
| 307 | file_storage: FileStorageInterface = storages[settings.file_storage]() |
| 308 | normalized_code = normalize_share_code(code) |
| 309 | if await get_select_token(normalized_code) != key: |
| 310 | ip_limit["error"].add_ip(ip) |
| 311 | raise HTTPException(status_code=403, detail="下载鉴权失败") |
| 312 | has, file_code = await get_code_file_by_code(normalized_code, False) |
| 313 | if not has: |
| 314 | return APIResponse(code=404, detail="文件不存在") |
| 315 | assert isinstance(file_code, FileCodes) |
| 316 | return ( |
| 317 | APIResponse(detail=file_code.text) |
| 318 | if file_code.text |
| 319 | else await file_storage.get_file_response(file_code) |
| 320 | ) |
| 321 | |
| 322 | |
| 323 | chunk_api = APIRouter(prefix="/chunk", tags=["切片"]) |
nothing calls this directly
no test coverage detected