(request: Request, scope: str)
| 143 | |
| 144 | |
| 145 | async def require_scope(request: Request, scope: str) -> AuthContext: |
| 146 | raw_key = _extract_raw_api_key(request) |
| 147 | if raw_key: |
| 148 | return await _require_api_key_scope(request, raw_key, scope) |
| 149 | |
| 150 | token = _extract_dashboard_jwt(request) |
| 151 | if not token: |
| 152 | raise ApiError("Missing API key", status_code=401) |
| 153 | try: |
| 154 | payload = jwt.decode( |
| 155 | token, |
| 156 | request.app.state.jwt_secret, |
| 157 | algorithms=["HS256"], |
| 158 | ) |
| 159 | except jwt.ExpiredSignatureError as exc: |
| 160 | raise ApiError("Token expired", status_code=401) from exc |
| 161 | except jwt.InvalidTokenError as exc: |
| 162 | auth_header = request.headers.get("Authorization", "").strip() |
| 163 | if auth_header.startswith("Bearer "): |
| 164 | try: |
| 165 | return await _require_api_key_scope(request, token, scope) |
| 166 | except ApiError as api_key_exc: |
| 167 | raise api_key_exc from exc |
| 168 | raise ApiError("Invalid token", status_code=401) from exc |
| 169 | |
| 170 | username = payload.get("username") |
| 171 | if not isinstance(username, str) or not username.strip(): |
| 172 | raise ApiError("Invalid token", status_code=401) |
| 173 | return AuthContext(username=username, scopes=["*"], via="jwt") |
| 174 | |
| 175 | |
| 176 | def get_auth_service(request: Request) -> AuthService: |
no test coverage detected