MCPcopy
hub / github.com/gpustack/gpustack / cluster_apiserver_proxy

Function cluster_apiserver_proxy

gpustack/routes/clusters.py:838–907  ·  view source on GitHub ↗

Proxy a request to the Kubernetes API server of a Kubernetes-provider cluster, by forwarding it through one of the cluster's worker pods. The worker uses its in-pod ServiceAccount credentials to call the API server. Uses an inline session instead of SessionDep so the session is rel

(
    request: Request,
    id: int,
    path: str,
)

Source from the content-addressed store, hash-verified

836 include_in_schema=False,
837)
838async def cluster_apiserver_proxy(
839 request: Request,
840 id: int,
841 path: str,
842):
843 """
844 Proxy a request to the Kubernetes API server of a Kubernetes-provider
845 cluster, by forwarding it through one of the cluster's worker pods. The
846 worker uses its in-pod ServiceAccount credentials to call the API server.
847
848 Uses an inline session instead of SessionDep so the session is released
849 immediately after the initial lookup, preventing long-lived Kubernetes
850 watch/log streams from holding a database connection.
851 """
852 async with async_session() as session:
853 cluster = await Cluster.one_by_id(session, id)
854 if not cluster or cluster.deleted_at is not None:
855 raise NotFoundException(message=f"cluster {id} not found")
856 if cluster.provider != ClusterProvider.Kubernetes:
857 raise InvalidException(
858 message=(
859 f"cluster {cluster.name}(id: {id}) provider is "
860 f"{cluster.provider.value}; API server proxy is only supported "
861 "for Kubernetes-provider clusters."
862 )
863 )
864
865 workers = await Worker.all_by_fields(
866 session,
867 fields={"cluster_id": id, "state": WorkerStateEnum.READY},
868 )
869 if not workers:
870 raise ServiceUnavailableException(
871 message=f"No reachable workers in cluster {cluster.name}(id: {id})"
872 )
873 worker = random.choice(workers)
874 session.expunge(worker)
875
876 headers = {
877 k: v
878 for k, v in request.headers.items()
879 if k.lower() not in _CLUSTER_PROXY_REQUEST_HEADER_SKIP
880 }
881
882 body = None
883 if request.method not in ("GET", "HEAD", "OPTIONS"):
884 body = await request.body()
885
886 # request.query_params preserves order but a flat dict is sufficient for
887 # the Kubernetes API surface we forward (no duplicate keys in practice).
888 params = dict(request.query_params) or None
889
890 # No total timeout — Kubernetes watch and log-follow streams may be open
891 # indefinitely. Connect timeout still bounds the upstream connect step.
892 timeout = aiohttp.ClientTimeout(total=None, sock_connect=10)
893
894 return StreamingResponseWithStatusCode(
895 stream_to_worker(

Callers

nothing calls this directly

Calls 5

async_sessionFunction · 0.90
stream_to_workerFunction · 0.90
one_by_idMethod · 0.80
all_by_fieldsMethod · 0.80

Tested by

no test coverage detected