Asynchronously fetch authentication token. Args: api_key: The API key to authenticate with Returns: AuthTokenResponse containing token and project information, or None if failed
(self, api_key: str)
| 25 | super().__init__(endpoint) |
| 26 | |
| 27 | async def fetch_auth_token(self, api_key: str) -> AuthTokenResponse: |
| 28 | """ |
| 29 | Asynchronously fetch authentication token. |
| 30 | |
| 31 | Args: |
| 32 | api_key: The API key to authenticate with |
| 33 | |
| 34 | Returns: |
| 35 | AuthTokenResponse containing token and project information, or None if failed |
| 36 | """ |
| 37 | try: |
| 38 | path = "/v3/auth/token" |
| 39 | data = {"api_key": api_key} |
| 40 | headers = self.prepare_headers() |
| 41 | |
| 42 | # Build full URL |
| 43 | url = self._get_full_url(path) |
| 44 | |
| 45 | # Make async request |
| 46 | response_data = await HttpClient.async_request( |
| 47 | method="POST", url=url, data=data, headers=headers, timeout=30 |
| 48 | ) |
| 49 | |
| 50 | token = response_data.get("token") |
| 51 | if not token: |
| 52 | logger.warning("Authentication failed: Perhaps an invalid API key?") |
| 53 | return None |
| 54 | |
| 55 | # Check project premium status |
| 56 | if response_data.get("project_prem_status") != "pro": |
| 57 | logger.info( |
| 58 | colored( |
| 59 | "\x1b[34mYou're on the agentops free plan 🤔\x1b[0m", |
| 60 | "blue", |
| 61 | ) |
| 62 | ) |
| 63 | |
| 64 | return response_data |
| 65 | |
| 66 | except Exception: |
| 67 | return None |
| 68 | |
| 69 | # Add V3-specific API methods here |
no test coverage detected