Stream a request to a worker and yield response chunks. Yields tuples of (chunk, headers, status). Automatically handles: - URL construction (advertise_address or ip + port) - Proxy selection (based on use_proxy_env_for_url) - Authorization header Args: worker
(
worker: Worker,
method: Literal["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"],
path: str,
proxy_client: Optional[aiohttp.ClientSession] = None,
no_proxy_client: Optional[aiohttp.ClientSession] = None,
params: Optional[Dict] = None,
data: Optional[Union[bytes, AsyncIterator[bytes], aiohttp.FormData]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[aiohttp.ClientTimeout] = None,
on_exception: Optional[
Callable[[Exception, aiohttp.ClientTimeout], Tuple[str, int]]
] = None,
raw: bool = False,
)
| 212 | |
| 213 | |
| 214 | async def stream_to_worker( |
| 215 | worker: Worker, |
| 216 | method: Literal["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"], |
| 217 | path: str, |
| 218 | proxy_client: Optional[aiohttp.ClientSession] = None, |
| 219 | no_proxy_client: Optional[aiohttp.ClientSession] = None, |
| 220 | params: Optional[Dict] = None, |
| 221 | data: Optional[Union[bytes, AsyncIterator[bytes], aiohttp.FormData]] = None, |
| 222 | headers: Optional[Dict[str, str]] = None, |
| 223 | timeout: Optional[aiohttp.ClientTimeout] = None, |
| 224 | on_exception: Optional[ |
| 225 | Callable[[Exception, aiohttp.ClientTimeout], Tuple[str, int]] |
| 226 | ] = None, |
| 227 | raw: bool = False, |
| 228 | ) -> AsyncGenerator[Tuple[Union[bytes, str], dict, int], None]: |
| 229 | """ |
| 230 | Stream a request to a worker and yield response chunks. |
| 231 | |
| 232 | Yields tuples of (chunk, headers, status). |
| 233 | |
| 234 | Automatically handles: |
| 235 | - URL construction (advertise_address or ip + port) |
| 236 | - Proxy selection (based on use_proxy_env_for_url) |
| 237 | - Authorization header |
| 238 | |
| 239 | Args: |
| 240 | worker: Target worker |
| 241 | method: HTTP method |
| 242 | path: API path |
| 243 | proxy_client: HTTP client with proxy |
| 244 | no_proxy_client: HTTP client without proxy |
| 245 | params: Query parameters |
| 246 | data: Bytes, async iterator of bytes, or FormData |
| 247 | headers: Additional headers |
| 248 | timeout: Request timeout |
| 249 | on_exception: Optional callback(exception, timeout) -> (error_msg, status_code). |
| 250 | Called when an exception occurs during streaming. If not provided, |
| 251 | the exception is raised. |
| 252 | raw: If True, yield raw bytes without SSE line formatting (use for log streams). |
| 253 | If False (default), format each line as SSE (use for OpenAI-compatible streams). |
| 254 | """ |
| 255 | try: |
| 256 | async with _request_to_worker( |
| 257 | worker=worker, |
| 258 | method=method, |
| 259 | path=path, |
| 260 | proxy_client=proxy_client, |
| 261 | no_proxy_client=no_proxy_client, |
| 262 | params=params, |
| 263 | data=data, |
| 264 | headers=headers, |
| 265 | timeout=timeout, |
| 266 | raise_on_error=False, |
| 267 | ) as resp: |
| 268 | if resp.status >= 400: |
| 269 | body = await resp.read() |
| 270 | yield body, dict(resp.headers), resp.status |
| 271 | return |
no test coverage detected