Validates the refresh token and issues a new access token + rotated refresh token. The refresh token is read from cookies. Applies the same ``password_changed_at`` and logout-revocation checks as ``get_current_user`` — a refresh must not outlive either. Rotates the refresh cook
(
request: Request,
response: Response,
db_session: AsyncSession = Depends(get_db_session),
)
| 192 | }, |
| 193 | ) |
| 194 | async def refresh( |
| 195 | request: Request, |
| 196 | response: Response, |
| 197 | db_session: AsyncSession = Depends(get_db_session), |
| 198 | ): |
| 199 | """ |
| 200 | Validates the refresh token and issues a new access token + rotated refresh |
| 201 | token. The refresh token is read from cookies. |
| 202 | |
| 203 | Applies the same ``password_changed_at`` and logout-revocation checks as |
| 204 | ``get_current_user`` — a refresh must not outlive either. Rotates the |
| 205 | refresh cookie on every call; the old token's ``jti`` is marked consumed |
| 206 | in Redis, and replay is treated as theft (all sessions revoked). |
| 207 | """ |
| 208 | # Rate limit refresh endpoint to prevent brute force attacks |
| 209 | is_allowed, retry_after = check_refresh_rate_limit(request) |
| 210 | if not is_allowed: |
| 211 | raise HTTPException( |
| 212 | status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
| 213 | detail={ |
| 214 | "code": "RATE_LIMITED", |
| 215 | "message": "Too many refresh attempts. Please try again later.", |
| 216 | "retry_after": retry_after, |
| 217 | }, |
| 218 | ) |
| 219 | |
| 220 | credentials_exception = HTTPException( |
| 221 | status_code=status.HTTP_401_UNAUTHORIZED, |
| 222 | detail="Invalid credentials", |
| 223 | headers={"WWW-Authenticate": "Bearer"}, |
| 224 | ) |
| 225 | |
| 226 | refresh_token = request.cookies.get(JWT_REFRESH_COOKIE_NAME) |
| 227 | if not refresh_token: |
| 228 | raise credentials_exception |
| 229 | |
| 230 | payload = decode_refresh_token(refresh_token) |
| 231 | if not payload: |
| 232 | raise credentials_exception |
| 233 | |
| 234 | email = payload.get("sub") |
| 235 | if not email: |
| 236 | raise credentials_exception |
| 237 | |
| 238 | user = await security_get_user(request, db_session, email=email) |
| 239 | if user is None or user.id is None: |
| 240 | raise credentials_exception |
| 241 | |
| 242 | # Enforce password-change cutover: tokens minted before the user's last |
| 243 | # password change are stale. |
| 244 | iat_raw = payload.get("iat") |
| 245 | issued_at = None |
| 246 | if iat_raw: |
| 247 | try: |
| 248 | issued_at = datetime.fromtimestamp(iat_raw, tz=timezone.utc) |
| 249 | except (TypeError, ValueError, OSError): |
| 250 | issued_at = None |
| 251 |
nothing calls this directly
no test coverage detected