MCPcopy Index your code
hub / github.com/hunvreus/devpush / GitHubInstallationService

Class GitHubInstallationService

app/services/github_installation.py:9–42  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

7
8
9class GitHubInstallationService:
10 def __init__(self, github_service: GitHubService):
11 self.github_service = github_service
12
13 async def get_or_refresh_installation(
14 self, installation_id: int, db: AsyncSession
15 ) -> GithubInstallation:
16 """Get installation instance, refreshing token if needed."""
17 result = await db.execute(
18 select(GithubInstallation).where(
19 GithubInstallation.installation_id == installation_id
20 )
21 )
22 installation = result.scalar_one_or_none()
23
24 if not installation:
25 installation = GithubInstallation(installation_id=installation_id)
26
27 if not installation.token or (
28 installation.token_expires_at
29 and installation.token_expires_at
30 <= datetime.now(timezone.utc).replace(tzinfo=None)
31 ):
32 token_data = await self.github_service.get_installation_access_token(
33 str(installation_id)
34 )
35 installation.token = token_data["token"]
36 installation.token_expires_at = datetime.fromisoformat(
37 token_data["expires_at"].replace("Z", "+00:00")
38 ).replace(tzinfo=None)
39
40 installation = await db.merge(installation)
41 await db.commit()
42 return installation

Callers 1

Calls

no outgoing calls

Tested by

no test coverage detected