MCPcopy Index your code
hub / github.com/NVIDIA/TensorRT-LLM / RPCClient

Class RPCClient

tensorrt_llm/executor/rpc/rpc_client.py:80–720  ·  view source on GitHub ↗

An RPC Client that connects to the RPCServer.

Source from the content-addressed store, hash-verified

78
79
80class RPCClient:
81 """
82 An RPC Client that connects to the RPCServer.
83 """
84
85 def __init__(self,
86 address: str,
87 hmac_key=None,
88 timeout: Optional[float] = None,
89 num_workers: int = 4):
90 '''
91 Args:
92 address: The ZMQ address to connect to.
93 hmac_key: The HMAC key for encryption.
94 timeout: The timeout (seconds) for RPC calls.
95 num_workers: The number of workers for the RPC client.
96 '''
97 self._address = address
98 self._timeout = timeout
99
100 # Check if PAIR mode is enabled via environment variable
101 use_pair_mode = os.environ.get('TLLM_LLMAPI_ZMQ_PAIR', '0') != '0'
102 socket_type = zmq.PAIR if use_pair_mode else zmq.DEALER
103
104 if use_pair_mode:
105 logger_debug(
106 "[client] Using zmq.PAIR socket type for RPC communication")
107
108 self._client_socket = ZeroMqQueue(address=(address, hmac_key),
109 is_server=False,
110 is_async=True,
111 use_hmac_encryption=hmac_key
112 is not None,
113 socket_type=socket_type,
114 name="rpc_client")
115 self._pending_futures = {}
116 # map request_id to the queue for streaming responses
117 self._streaming_queues: Dict[str, AsyncQueue] = {}
118 self._streaming_queues_lock = threading.RLock(
119 ) # Protect cross-thread access
120 self._reader_task = None
121 self._executor = concurrent.futures.ThreadPoolExecutor(
122 max_workers=num_workers, thread_name_prefix="rpc_client_worker")
123
124 self._server_stopped = False
125 self._closed = False
126 self._loop = None
127 self._loop_thread = None
128 self._reader_asyncio_task = None # Track the asyncio task for proper cancellation
129 self._loop_lock = threading.Lock(
130 ) # Lock to protect event loop initialization
131
132 # Eagerly create the background event loop so that all subsequent
133 # RPC calls (sync or streaming) can assume it exists. This removes a
134 # race between the first streaming call (which previously created the
135 # loop lazily) and immediate fire-and-forget calls like `submit()`.
136 self._ensure_event_loop()
137

Calls

no outgoing calls