MCPcopy
hub / github.com/NVIDIA/TensorRT-LLM / _call_streaming

Method _call_streaming

tensorrt_llm/executor/rpc/rpc_client.py:563–658  ·  view source on GitHub ↗

Call a remote async generator method and get streaming results. Implementation note: The outgoing request is sent on the RPCClient’s private event-loop to obey the single-loop rule. The returned items are yielded in the caller’s loop via AsyncQueue, which is thread

(self, name: str, *args,
                              **kwargs)

Source from the content-addressed store, hash-verified

561 return self._executor.submit(_async_to_sync)
562
563 async def _call_streaming(self, name: str, *args,
564 **kwargs) -> AsyncIterator[Any]:
565 """
566 Call a remote async generator method and get streaming results.
567
568 Implementation note: The outgoing request is sent on the RPCClient’s
569 private event-loop to obey the single-loop rule. The returned items
570 are yielded in the caller’s loop via AsyncQueue, which is thread-safe.
571 """
572 nvtx_mark_debug(f"RPC.streaming.{name}", color="red", category="RPC")
573
574 if self._server_stopped:
575 raise RPCCancelled("Server is shutting down, request cancelled")
576 rpc_params = kwargs.pop("__rpc_params", RPCParams())
577 timeout = rpc_params.timeout if rpc_params.timeout is not None else self._timeout
578
579 request_id = uuid.uuid4().hex
580 # Use AsyncQueue to ensure proper cross-thread communication
581 queue = AsyncQueue()
582 # Recreate sync_q with the current running loop for proper cross-thread communication
583 queue._sync_q = _SyncQueue(queue, asyncio.get_running_loop())
584
585 # Register queue with lock to ensure thread-safe access
586 with self._streaming_queues_lock:
587 self._streaming_queues[request_id] = queue
588 #logger_debug(f"[{datetime.now().isoformat()}] Registered streaming queue for request_id={request_id}")
589
590 # Build the RPCRequest object here – it's pickle-able and small – but
591 # *do not* touch the ZeroMQ socket from this (caller) event-loop.
592 request = RPCRequest(request_id,
593 method_name=name,
594 args=args,
595 kwargs=kwargs,
596 need_response=True,
597 timeout=timeout,
598 is_streaming=True)
599
600 # Send the request on the RPCClient's dedicated loop to guarantee that
601 # **all** socket I/O happens from exactly one thread / loop.
602 async def _send_streaming_request(req: RPCRequest):
603 """Private helper executed in the client loop to put the request."""
604 logger_debug(
605 f"[client] [{datetime.now().isoformat()}] Sending streaming request: {req.method_name}, request_id={req.request_id}"
606 )
607 await self._client_socket.put_async(req)
608 logger_debug(
609 f"[client][{datetime.now().isoformat()}] Streaming request sent successfully: {req.method_name}, request_id={req.request_id}"
610 )
611
612 send_future = asyncio.run_coroutine_threadsafe(
613 _send_streaming_request(request), self._loop)
614
615 # Wait until the request is actually on the wire before entering the
616 # user-visible streaming loop. We wrap the concurrent.futures.Future so
617 # we can await it in the caller's asyncio context.
618 await asyncio.wrap_future(send_future)
619
620 try:

Callers

nothing calls this directly

Calls 12

getMethod · 0.95
nvtx_mark_debugFunction · 0.90
RPCCancelledClass · 0.85
RPCParamsClass · 0.85
AsyncQueueClass · 0.85
_SyncQueueClass · 0.85
RPCRequestClass · 0.85
logger_debugFunction · 0.85
enable_llmapi_debugFunction · 0.85
RPCStreamingErrorClass · 0.85
RPCTimeoutClass · 0.85
popMethod · 0.80

Tested by

no test coverage detected