Validate JWT token Args: token: JWT token Returns: dict: Dictionary containing user information Raises: HTTPException: If token is invalid or expired
(self, token: str)
| 118 | return jwt.encode(payload.model_dump(), self.secret, algorithm=self.algorithm) |
| 119 | |
| 120 | def validate_token(self, token: str) -> dict: |
| 121 | """ |
| 122 | Validate JWT token |
| 123 | |
| 124 | Args: |
| 125 | token: JWT token |
| 126 | |
| 127 | Returns: |
| 128 | dict: Dictionary containing user information |
| 129 | |
| 130 | Raises: |
| 131 | HTTPException: If token is invalid or expired |
| 132 | """ |
| 133 | try: |
| 134 | # Explicitly exclude 'none' to prevent algorithm confusion attacks |
| 135 | allowed_algorithms = [self.algorithm] |
| 136 | if "none" in (a.lower() for a in allowed_algorithms): |
| 137 | raise HTTPException( |
| 138 | status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| 139 | detail="Insecure JWT algorithm configuration", |
| 140 | ) |
| 141 | payload = jwt.decode(token, self.secret, algorithms=allowed_algorithms) |
| 142 | expire_timestamp = payload["exp"] |
| 143 | expire_time = datetime.fromtimestamp(expire_timestamp, timezone.utc) |
| 144 | |
| 145 | if datetime.now(timezone.utc) > expire_time: |
| 146 | raise HTTPException( |
| 147 | status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired" |
| 148 | ) |
| 149 | |
| 150 | # Return complete payload instead of just username |
| 151 | return { |
| 152 | "username": payload["sub"], |
| 153 | "role": payload.get("role", "user"), |
| 154 | "metadata": payload.get("metadata", {}), |
| 155 | "exp": expire_time, |
| 156 | } |
| 157 | except jwt.PyJWTError: |
| 158 | raise HTTPException( |
| 159 | status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token" |
| 160 | ) |
| 161 | |
| 162 | |
| 163 | auth_handler = AuthHandler() |