(self, current_request: Request)
| 251 | self.shutdown_event = shutdown_event |
| 252 | |
| 253 | async def auth_middleware(self, current_request: Request): |
| 254 | path = current_request.url.path |
| 255 | if not path.startswith("/api"): |
| 256 | return None |
| 257 | rate_limit_response = await self._apply_auth_rate_limit(current_request, path) |
| 258 | if rate_limit_response is not None: |
| 259 | return rate_limit_response |
| 260 | if path.startswith("/api/v1"): |
| 261 | return None |
| 262 | |
| 263 | allowed_exact_endpoints = { |
| 264 | "/api/auth/login", |
| 265 | "/api/auth/logout", |
| 266 | "/api/auth/setup-status", |
| 267 | "/api/auth/setup", |
| 268 | "/api/stat/versions", |
| 269 | } |
| 270 | allowed_endpoint_prefixes = [ |
| 271 | "/api/file", |
| 272 | "/api/v1/files/tokens", |
| 273 | "/api/platform/webhook", |
| 274 | "/api/stat/start-time", |
| 275 | "/api/backup/download", # 备份下载使用 URL 参数传递 token |
| 276 | ] |
| 277 | if path in allowed_exact_endpoints or any( |
| 278 | path.startswith(prefix) for prefix in allowed_endpoint_prefixes |
| 279 | ): |
| 280 | return None |
| 281 | is_plugin_page_path = PluginPageAuth.is_protected_path(path) |
| 282 | dashboard_token = self._extract_dashboard_jwt(current_request) |
| 283 | asset_token = ( |
| 284 | PluginPageAuth.extract_asset_token(current_request.query_params) |
| 285 | if is_plugin_page_path |
| 286 | else None |
| 287 | ) |
| 288 | token_candidates = [] |
| 289 | if dashboard_token: |
| 290 | token_candidates.append(dashboard_token) |
| 291 | if asset_token and asset_token != dashboard_token: |
| 292 | token_candidates.append(asset_token) |
| 293 | if not token_candidates: |
| 294 | r = JSONResponse(error("未授权")) |
| 295 | r.status_code = 401 |
| 296 | return r |
| 297 | |
| 298 | token_errors: list[str] = [] |
| 299 | for token in token_candidates: |
| 300 | payload, token_error = self._validate_dashboard_token(token, path) |
| 301 | if payload is not None: |
| 302 | current_request.state.dashboard_g.username = cast( |
| 303 | str, payload["username"] |
| 304 | ) |
| 305 | return None |
| 306 | token_errors.append(token_error) |
| 307 | |
| 308 | error_message = ( |
| 309 | "Token 过期" |
| 310 | if token_errors and all(item == "Token 过期" for item in token_errors) |
no test coverage detected