MCPcopy
hub / github.com/zenml-io/zenml / authenticate_external_user

Function authenticate_external_user

src/zenml/zen_server/auth.py:683–929  ·  view source on GitHub ↗

Implement external authentication. Args: external_access_token: The access token used to authenticate the user to the external authenticator. request: The request object. Returns: The authentication context reflecting the authenticated user. Raises:

(
    external_access_token: str, request: Optional[Request] = None
)

Source from the content-addressed store, hash-verified

681
682
683def authenticate_external_user(
684 external_access_token: str, request: Optional[Request] = None
685) -> AuthContext:
686 """Implement external authentication.
687
688 Args:
689 external_access_token: The access token used to authenticate the user
690 to the external authenticator.
691 request: The request object.
692
693 Returns:
694 The authentication context reflecting the authenticated user.
695
696 Raises:
697 AuthorizationException: If the external user could not be authorized.
698 """
699 config = server_config()
700 store = zen_store()
701
702 assert config.external_user_info_url is not None
703
704 # Use the external access token to extract the user information and
705 # permissions
706
707 # Get the user information from the external authenticator
708 user_info_url = config.external_user_info_url
709 headers = {"Authorization": "Bearer " + external_access_token}
710 headers.update(get_zenml_headers())
711 query_params = dict(server_id=str(config.get_external_server_id()))
712
713 try:
714 auth_response = requests.get(
715 user_info_url,
716 headers=headers,
717 params=urlencode(query_params),
718 timeout=EXTERNAL_AUTHENTICATOR_TIMEOUT,
719 )
720 except Exception as e:
721 logger.exception(
722 f"Error fetching user information from external authenticator: {e}"
723 )
724 raise AuthorizationException(
725 "Error fetching user information from external authenticator."
726 )
727
728 external_user: Optional[ExternalUserModel] = None
729
730 if 200 <= auth_response.status_code < 300:
731 try:
732 payload = auth_response.json()
733 except requests.exceptions.JSONDecodeError:
734 logger.exception(
735 "Error decoding JSON response from external authenticator."
736 )
737 raise AuthorizationException(
738 "Unknown external authenticator error"
739 )
740

Callers 3

tokenFunction · 0.90
authenticate_credentialsFunction · 0.85
authenticate_api_keyFunction · 0.85

Calls 15

server_configFunction · 0.90
zen_storeFunction · 0.90
get_zenml_headersFunction · 0.90
UserFilterClass · 0.90
UserUpdateClass · 0.90
UserRequestClass · 0.90
AnalyticsContextClass · 0.90
AuthContextClass · 0.85

Tested by

no test coverage detected