| 343 | |
| 344 | |
| 345 | async def get_current_user( |
| 346 | request: Request, |
| 347 | db_session: AsyncSession = Depends(get_db_session), |
| 348 | ) -> Union[PublicUser, APITokenUser, SuperadminAPITokenUser, AnonymousUser]: |
| 349 | credentials_exception = HTTPException( |
| 350 | status_code=status.HTTP_401_UNAUTHORIZED, |
| 351 | detail="Could not validate credentials", |
| 352 | headers={"WWW-Authenticate": "Bearer"}, |
| 353 | ) |
| 354 | |
| 355 | # Step 1: Check for API token (Bearer lh_... or Bearer lh_sa_...) |
| 356 | auth_header = request.headers.get("Authorization", "").strip() |
| 357 | auth_lower = auth_header.lower() |
| 358 | |
| 359 | # ORDER MATTERS: the superadmin prefix "lh_sa_" also starts with "lh_", |
| 360 | # so the broader org-token branch below would shadow it. Keep this |
| 361 | # branch above the org-token branch — do not reorder for "alphabetical |
| 362 | # cleanliness" — or every superadmin token will silently fall through |
| 363 | # to org-token validation and fail. |
| 364 | if auth_lower.startswith("bearer lh_sa_"): |
| 365 | token = auth_header[7:].strip() # strip "Bearer " |
| 366 | sa_user = await validate_superadmin_api_token(token, db_session) |
| 367 | if sa_user: |
| 368 | # No _verify_api_token_org_boundary call: superadmin tokens are |
| 369 | # cross-org by design and have no org to bind to. |
| 370 | request.state.user = sa_user |
| 371 | request.state.is_api_token = True |
| 372 | request.state.is_superadmin_api_token = True |
| 373 | return sa_user |
| 374 | raise credentials_exception |
| 375 | |
| 376 | # Case-insensitive check for "Bearer " prefix with lh_ token (org-scoped) |
| 377 | if auth_lower.startswith("bearer lh_"): |
| 378 | token = auth_header[7:].strip() # Remove "Bearer " prefix and trim |
| 379 | api_token_user = await validate_api_token(token, db_session) |
| 380 | if api_token_user: |
| 381 | # Verify org boundary: if the URL contains an org_id or org_slug, |
| 382 | # ensure it matches the token's organization |
| 383 | await _verify_api_token_org_boundary(request, api_token_user, db_session) |
| 384 | request.state.user = api_token_user |
| 385 | request.state.is_api_token = True |
| 386 | return api_token_user |
| 387 | raise credentials_exception |
| 388 | |
| 389 | # Step 2: Fall back to JWT logic using PyJWT |
| 390 | token = extract_jwt_from_request(request) |
| 391 | username = None |
| 392 | |
| 393 | if token: |
| 394 | payload = decode_jwt(token) |
| 395 | if payload: |
| 396 | # Reject tokens minted for a single-purpose flow (e.g. magic-link |
| 397 | # one-time sign-in, password reset, email verification). Those tokens |
| 398 | # are only valid at their specific consume endpoint — allowing them as |
| 399 | # session tokens would let an intercepted single-use token act as a |
| 400 | # full session for its entire TTL. |
| 401 | token_purpose = payload.get("purpose") |
| 402 | SINGLE_USE_PURPOSES = { |