Proxy a request to the Kubernetes API server of a Kubernetes-provider cluster, by forwarding it through one of the cluster's worker pods. The worker uses its in-pod ServiceAccount credentials to call the API server. Uses an inline session instead of SessionDep so the session is rel
(
request: Request,
id: int,
path: str,
)
| 836 | include_in_schema=False, |
| 837 | ) |
| 838 | async def cluster_apiserver_proxy( |
| 839 | request: Request, |
| 840 | id: int, |
| 841 | path: str, |
| 842 | ): |
| 843 | """ |
| 844 | Proxy a request to the Kubernetes API server of a Kubernetes-provider |
| 845 | cluster, by forwarding it through one of the cluster's worker pods. The |
| 846 | worker uses its in-pod ServiceAccount credentials to call the API server. |
| 847 | |
| 848 | Uses an inline session instead of SessionDep so the session is released |
| 849 | immediately after the initial lookup, preventing long-lived Kubernetes |
| 850 | watch/log streams from holding a database connection. |
| 851 | """ |
| 852 | async with async_session() as session: |
| 853 | cluster = await Cluster.one_by_id(session, id) |
| 854 | if not cluster or cluster.deleted_at is not None: |
| 855 | raise NotFoundException(message=f"cluster {id} not found") |
| 856 | if cluster.provider != ClusterProvider.Kubernetes: |
| 857 | raise InvalidException( |
| 858 | message=( |
| 859 | f"cluster {cluster.name}(id: {id}) provider is " |
| 860 | f"{cluster.provider.value}; API server proxy is only supported " |
| 861 | "for Kubernetes-provider clusters." |
| 862 | ) |
| 863 | ) |
| 864 | |
| 865 | workers = await Worker.all_by_fields( |
| 866 | session, |
| 867 | fields={"cluster_id": id, "state": WorkerStateEnum.READY}, |
| 868 | ) |
| 869 | if not workers: |
| 870 | raise ServiceUnavailableException( |
| 871 | message=f"No reachable workers in cluster {cluster.name}(id: {id})" |
| 872 | ) |
| 873 | worker = random.choice(workers) |
| 874 | session.expunge(worker) |
| 875 | |
| 876 | headers = { |
| 877 | k: v |
| 878 | for k, v in request.headers.items() |
| 879 | if k.lower() not in _CLUSTER_PROXY_REQUEST_HEADER_SKIP |
| 880 | } |
| 881 | |
| 882 | body = None |
| 883 | if request.method not in ("GET", "HEAD", "OPTIONS"): |
| 884 | body = await request.body() |
| 885 | |
| 886 | # request.query_params preserves order but a flat dict is sufficient for |
| 887 | # the Kubernetes API surface we forward (no duplicate keys in practice). |
| 888 | params = dict(request.query_params) or None |
| 889 | |
| 890 | # No total timeout — Kubernetes watch and log-follow streams may be open |
| 891 | # indefinitely. Connect timeout still bounds the upstream connect step. |
| 892 | timeout = aiohttp.ClientTimeout(total=None, sock_connect=10) |
| 893 | |
| 894 | return StreamingResponseWithStatusCode( |
| 895 | stream_to_worker( |
nothing calls this directly
no test coverage detected