| 21 | |
| 22 | |
| 23 | class DeploymentService: |
| 24 | def __init__(self): |
| 25 | pass |
| 26 | |
| 27 | @staticmethod |
| 28 | async def update_status( |
| 29 | db: AsyncSession, |
| 30 | deployment: Deployment, |
| 31 | *, |
| 32 | status: str | None = None, |
| 33 | conclusion: str | None = None, |
| 34 | error: dict | None = None, |
| 35 | container_status: str | None = None, |
| 36 | redis_client: Redis | None = None, |
| 37 | emit: bool = True, |
| 38 | ) -> None: |
| 39 | now = datetime.now(timezone.utc) |
| 40 | if status is not None: |
| 41 | deployment.status = status |
| 42 | if conclusion is not None: |
| 43 | deployment.conclusion = conclusion |
| 44 | deployment.concluded_at = now.replace(tzinfo=None) |
| 45 | if deployment.project: |
| 46 | deployment.project.updated_at = now.replace(tzinfo=None) |
| 47 | if error is not None: |
| 48 | deployment.error = error |
| 49 | if container_status is not None: |
| 50 | deployment.container_status = container_status |
| 51 | |
| 52 | await db.commit() |
| 53 | |
| 54 | if emit and redis_client and (status or conclusion): |
| 55 | status_value = conclusion if conclusion else status |
| 56 | fields = { |
| 57 | "event_type": "deployment_status_update", |
| 58 | "project_id": deployment.project_id, |
| 59 | "deployment_id": deployment.id, |
| 60 | "deployment_status": status_value, |
| 61 | "timestamp": now.isoformat(), |
| 62 | } |
| 63 | await redis_client.xadd( |
| 64 | f"stream:project:{deployment.project_id}:deployment:{deployment.id}:status", |
| 65 | fields, |
| 66 | ) |
| 67 | await redis_client.xadd( |
| 68 | f"stream:project:{deployment.project_id}:updates", fields |
| 69 | ) |
| 70 | |
| 71 | def get_alias_domains( |
| 72 | self, deployment: Deployment, settings: Settings |
| 73 | ) -> dict[str, str]: |
| 74 | project = deployment.project |
| 75 | values: dict[str, str] = {} |
| 76 | |
| 77 | if deployment.branch: |
| 78 | sanitized_branch = re.sub(r"[^a-zA-Z0-9-]", "-", deployment.branch).lower() |
| 79 | if sanitized_branch: |
| 80 | branch_subdomain = f"{project.slug}-branch-{sanitized_branch}" |
no outgoing calls
no test coverage detected