MCPcopy
hub / github.com/gpustack/gpustack / stream_to_worker

Function stream_to_worker

gpustack/server/worker_request.py:214–287  ·  view source on GitHub ↗

Stream a request to a worker and yield response chunks. Yields tuples of (chunk, headers, status). Automatically handles: - URL construction (advertise_address or ip + port) - Proxy selection (based on use_proxy_env_for_url) - Authorization header Args: worker

(
    worker: Worker,
    method: Literal["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"],
    path: str,
    proxy_client: Optional[aiohttp.ClientSession] = None,
    no_proxy_client: Optional[aiohttp.ClientSession] = None,
    params: Optional[Dict] = None,
    data: Optional[Union[bytes, AsyncIterator[bytes], aiohttp.FormData]] = None,
    headers: Optional[Dict[str, str]] = None,
    timeout: Optional[aiohttp.ClientTimeout] = None,
    on_exception: Optional[
        Callable[[Exception, aiohttp.ClientTimeout], Tuple[str, int]]
    ] = None,
    raw: bool = False,
)

Source from the content-addressed store, hash-verified

212
213
214async def stream_to_worker(
215 worker: Worker,
216 method: Literal["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"],
217 path: str,
218 proxy_client: Optional[aiohttp.ClientSession] = None,
219 no_proxy_client: Optional[aiohttp.ClientSession] = None,
220 params: Optional[Dict] = None,
221 data: Optional[Union[bytes, AsyncIterator[bytes], aiohttp.FormData]] = None,
222 headers: Optional[Dict[str, str]] = None,
223 timeout: Optional[aiohttp.ClientTimeout] = None,
224 on_exception: Optional[
225 Callable[[Exception, aiohttp.ClientTimeout], Tuple[str, int]]
226 ] = None,
227 raw: bool = False,
228) -> AsyncGenerator[Tuple[Union[bytes, str], dict, int], None]:
229 """
230 Stream a request to a worker and yield response chunks.
231
232 Yields tuples of (chunk, headers, status).
233
234 Automatically handles:
235 - URL construction (advertise_address or ip + port)
236 - Proxy selection (based on use_proxy_env_for_url)
237 - Authorization header
238
239 Args:
240 worker: Target worker
241 method: HTTP method
242 path: API path
243 proxy_client: HTTP client with proxy
244 no_proxy_client: HTTP client without proxy
245 params: Query parameters
246 data: Bytes, async iterator of bytes, or FormData
247 headers: Additional headers
248 timeout: Request timeout
249 on_exception: Optional callback(exception, timeout) -> (error_msg, status_code).
250 Called when an exception occurs during streaming. If not provided,
251 the exception is raised.
252 raw: If True, yield raw bytes without SSE line formatting (use for log streams).
253 If False (default), format each line as SSE (use for OpenAI-compatible streams).
254 """
255 try:
256 async with _request_to_worker(
257 worker=worker,
258 method=method,
259 path=path,
260 proxy_client=proxy_client,
261 no_proxy_client=no_proxy_client,
262 params=params,
263 data=data,
264 headers=headers,
265 timeout=timeout,
266 raise_on_error=False,
267 ) as resp:
268 if resp.status >= 400:
269 body = await resp.read()
270 yield body, dict(resp.headers), resp.status
271 return

Callers 4

cluster_apiserver_proxyFunction · 0.90
get_benchmark_logsFunction · 0.90
_stream_responseFunction · 0.90
get_serving_logsFunction · 0.90

Calls 5

_request_to_workerFunction · 0.85
_stream_response_chunksFunction · 0.85
errorMethod · 0.80
on_exceptionFunction · 0.50
readMethod · 0.45

Tested by

no test coverage detected