MCPcopy Index your code
hub / github.com/su-kaka/gcli2api / Credentials

Class Credentials

src/google_oauth_api.py:30–167  ·  view source on GitHub ↗

凭证类

Source from the content-addressed store, hash-verified

28
29
30class Credentials:
31 """凭证类"""
32
33 def __init__(
34 self,
35 access_token: str,
36 refresh_token: str = None,
37 client_id: str = None,
38 client_secret: str = None,
39 expires_at: datetime = None,
40 project_id: str = None,
41 ):
42 self.access_token = access_token
43 self.refresh_token = refresh_token
44 self.client_id = client_id
45 self.client_secret = client_secret
46 self.expires_at = expires_at
47 self.project_id = project_id
48
49 # 反代配置将在使用时异步获取
50 self.oauth_base_url = None
51 self.token_endpoint = None
52
53 def is_expired(self) -> bool:
54 """检查token是否过期"""
55 if not self.expires_at:
56 return True
57
58 # 提前3分钟认为过期
59 buffer = timedelta(minutes=3)
60 return (self.expires_at - buffer) <= datetime.now(timezone.utc)
61
62 async def refresh_if_needed(self) -> bool:
63 """如果需要则刷新token"""
64 if not self.is_expired():
65 return False
66
67 if not self.refresh_token:
68 raise TokenError("需要刷新令牌但未提供")
69
70 await self.refresh()
71 return True
72
73 async def refresh(self):
74 """刷新访问令牌"""
75 if not self.refresh_token:
76 raise TokenError("无刷新令牌")
77
78 data = {
79 "client_id": self.client_id,
80 "client_secret": self.client_secret,
81 "refresh_token": self.refresh_token,
82 "grant_type": "refresh_token",
83 }
84
85 try:
86 oauth_base_url = await get_oauth_proxy_url()
87 token_url = f"{oauth_base_url.rstrip('/')}/token"

Callers 1

exchange_codeMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected