Update user password. SECURITY: Users can only change their own password. This function: 1. Validates that user_id matches current_user.id (IDOR protection) 2. Verifies the old password before allowing change 3. Validates password complexity requirements
(
request: Request,
db_session: AsyncSession,
current_user: PublicUser | AnonymousUser,
user_id: int,
form: UserUpdatePassword,
)
| 458 | |
| 459 | |
| 460 | async def update_user_password( |
| 461 | request: Request, |
| 462 | db_session: AsyncSession, |
| 463 | current_user: PublicUser | AnonymousUser, |
| 464 | user_id: int, |
| 465 | form: UserUpdatePassword, |
| 466 | ): |
| 467 | """ |
| 468 | Update user password. |
| 469 | |
| 470 | SECURITY: Users can only change their own password. This function: |
| 471 | 1. Validates that user_id matches current_user.id (IDOR protection) |
| 472 | 2. Verifies the old password before allowing change |
| 473 | 3. Validates password complexity requirements |
| 474 | """ |
| 475 | |
| 476 | # Users can ONLY change their own password |
| 477 | if current_user.id != user_id: |
| 478 | raise HTTPException( |
| 479 | status_code=status.HTTP_403_FORBIDDEN, |
| 480 | detail="You can only change your own password", |
| 481 | ) |
| 482 | |
| 483 | # Validate new password complexity |
| 484 | validation_result = validate_password_complexity(form.new_password) |
| 485 | if not validation_result.is_valid: |
| 486 | raise HTTPException( |
| 487 | status_code=400, |
| 488 | detail={ |
| 489 | "code": "WEAK_PASSWORD", |
| 490 | "message": "Password does not meet security requirements", |
| 491 | "errors": validation_result.errors, |
| 492 | "requirements": validation_result.requirements, |
| 493 | }, |
| 494 | ) |
| 495 | |
| 496 | # Get user (we already verified it's the current user) |
| 497 | statement = select(User).where(User.id == user_id) |
| 498 | user = (await db_session.execute(statement)).scalars().first() |
| 499 | |
| 500 | if not user: |
| 501 | raise HTTPException( |
| 502 | status_code=400, |
| 503 | detail="User does not exist", |
| 504 | ) |
| 505 | |
| 506 | # Verify old password before allowing change |
| 507 | if not security_verify_password(form.old_password, user.password): |
| 508 | raise HTTPException( |
| 509 | status_code=status.HTTP_401_UNAUTHORIZED, detail="Wrong password" |
| 510 | ) |
| 511 | |
| 512 | # Update user |
| 513 | user.password = security_hash_password(form.new_password) |
| 514 | user.update_date = str(datetime.now()) |
| 515 | |
| 516 | # Update user in database |
| 517 | db_session.add(user) |