MCPcopy
hub / github.com/learnhouse/learnhouse / get_current_user

Function get_current_user

apps/api/src/security/auth.py:345–452  ·  view source on GitHub ↗
(
    request: Request,
    db_session: AsyncSession = Depends(get_db_session),
)

Source from the content-addressed store, hash-verified

343
344
345async def get_current_user(
346 request: Request,
347 db_session: AsyncSession = Depends(get_db_session),
348) -> Union[PublicUser, APITokenUser, SuperadminAPITokenUser, AnonymousUser]:
349 credentials_exception = HTTPException(
350 status_code=status.HTTP_401_UNAUTHORIZED,
351 detail="Could not validate credentials",
352 headers={"WWW-Authenticate": "Bearer"},
353 )
354
355 # Step 1: Check for API token (Bearer lh_... or Bearer lh_sa_...)
356 auth_header = request.headers.get("Authorization", "").strip()
357 auth_lower = auth_header.lower()
358
359 # ORDER MATTERS: the superadmin prefix "lh_sa_" also starts with "lh_",
360 # so the broader org-token branch below would shadow it. Keep this
361 # branch above the org-token branch — do not reorder for "alphabetical
362 # cleanliness" — or every superadmin token will silently fall through
363 # to org-token validation and fail.
364 if auth_lower.startswith("bearer lh_sa_"):
365 token = auth_header[7:].strip() # strip "Bearer "
366 sa_user = await validate_superadmin_api_token(token, db_session)
367 if sa_user:
368 # No _verify_api_token_org_boundary call: superadmin tokens are
369 # cross-org by design and have no org to bind to.
370 request.state.user = sa_user
371 request.state.is_api_token = True
372 request.state.is_superadmin_api_token = True
373 return sa_user
374 raise credentials_exception
375
376 # Case-insensitive check for "Bearer " prefix with lh_ token (org-scoped)
377 if auth_lower.startswith("bearer lh_"):
378 token = auth_header[7:].strip() # Remove "Bearer " prefix and trim
379 api_token_user = await validate_api_token(token, db_session)
380 if api_token_user:
381 # Verify org boundary: if the URL contains an org_id or org_slug,
382 # ensure it matches the token's organization
383 await _verify_api_token_org_boundary(request, api_token_user, db_session)
384 request.state.user = api_token_user
385 request.state.is_api_token = True
386 return api_token_user
387 raise credentials_exception
388
389 # Step 2: Fall back to JWT logic using PyJWT
390 token = extract_jwt_from_request(request)
391 username = None
392
393 if token:
394 payload = decode_jwt(token)
395 if payload:
396 # Reject tokens minted for a single-purpose flow (e.g. magic-link
397 # one-time sign-in, password reset, email verification). Those tokens
398 # are only valid at their specific consume endpoint — allowing them as
399 # session tokens would let an intercepted single-use token act as a
400 # full session for its entire TTL.
401 token_purpose = payload.get("purpose")
402 SINGLE_USE_PURPOSES = {

Calls 13

security_get_userFunction · 0.90
PublicUserClass · 0.90
AnonymousUserClass · 0.90
validate_api_tokenFunction · 0.85
extract_jwt_from_requestFunction · 0.85
decode_jwtFunction · 0.85
TokenDataClass · 0.85
replaceMethod · 0.80
getMethod · 0.45