凭证类
| 28 | |
| 29 | |
| 30 | class Credentials: |
| 31 | """凭证类""" |
| 32 | |
| 33 | def __init__( |
| 34 | self, |
| 35 | access_token: str, |
| 36 | refresh_token: str = None, |
| 37 | client_id: str = None, |
| 38 | client_secret: str = None, |
| 39 | expires_at: datetime = None, |
| 40 | project_id: str = None, |
| 41 | ): |
| 42 | self.access_token = access_token |
| 43 | self.refresh_token = refresh_token |
| 44 | self.client_id = client_id |
| 45 | self.client_secret = client_secret |
| 46 | self.expires_at = expires_at |
| 47 | self.project_id = project_id |
| 48 | |
| 49 | # 反代配置将在使用时异步获取 |
| 50 | self.oauth_base_url = None |
| 51 | self.token_endpoint = None |
| 52 | |
| 53 | def is_expired(self) -> bool: |
| 54 | """检查token是否过期""" |
| 55 | if not self.expires_at: |
| 56 | return True |
| 57 | |
| 58 | # 提前3分钟认为过期 |
| 59 | buffer = timedelta(minutes=3) |
| 60 | return (self.expires_at - buffer) <= datetime.now(timezone.utc) |
| 61 | |
| 62 | async def refresh_if_needed(self) -> bool: |
| 63 | """如果需要则刷新token""" |
| 64 | if not self.is_expired(): |
| 65 | return False |
| 66 | |
| 67 | if not self.refresh_token: |
| 68 | raise TokenError("需要刷新令牌但未提供") |
| 69 | |
| 70 | await self.refresh() |
| 71 | return True |
| 72 | |
| 73 | async def refresh(self): |
| 74 | """刷新访问令牌""" |
| 75 | if not self.refresh_token: |
| 76 | raise TokenError("无刷新令牌") |
| 77 | |
| 78 | data = { |
| 79 | "client_id": self.client_id, |
| 80 | "client_secret": self.client_secret, |
| 81 | "refresh_token": self.refresh_token, |
| 82 | "grant_type": "refresh_token", |
| 83 | } |
| 84 | |
| 85 | try: |
| 86 | oauth_base_url = await get_oauth_proxy_url() |
| 87 | token_url = f"{oauth_base_url.rstrip('/')}/token" |