An RPC Client that connects to the RPCServer.
| 78 | |
| 79 | |
| 80 | class RPCClient: |
| 81 | """ |
| 82 | An RPC Client that connects to the RPCServer. |
| 83 | """ |
| 84 | |
| 85 | def __init__(self, |
| 86 | address: str, |
| 87 | hmac_key=None, |
| 88 | timeout: Optional[float] = None, |
| 89 | num_workers: int = 4): |
| 90 | ''' |
| 91 | Args: |
| 92 | address: The ZMQ address to connect to. |
| 93 | hmac_key: The HMAC key for encryption. |
| 94 | timeout: The timeout (seconds) for RPC calls. |
| 95 | num_workers: The number of workers for the RPC client. |
| 96 | ''' |
| 97 | self._address = address |
| 98 | self._timeout = timeout |
| 99 | |
| 100 | # Check if PAIR mode is enabled via environment variable |
| 101 | use_pair_mode = os.environ.get('TLLM_LLMAPI_ZMQ_PAIR', '0') != '0' |
| 102 | socket_type = zmq.PAIR if use_pair_mode else zmq.DEALER |
| 103 | |
| 104 | if use_pair_mode: |
| 105 | logger_debug( |
| 106 | "[client] Using zmq.PAIR socket type for RPC communication") |
| 107 | |
| 108 | self._client_socket = ZeroMqQueue(address=(address, hmac_key), |
| 109 | is_server=False, |
| 110 | is_async=True, |
| 111 | use_hmac_encryption=hmac_key |
| 112 | is not None, |
| 113 | socket_type=socket_type, |
| 114 | name="rpc_client") |
| 115 | self._pending_futures = {} |
| 116 | # map request_id to the queue for streaming responses |
| 117 | self._streaming_queues: Dict[str, AsyncQueue] = {} |
| 118 | self._streaming_queues_lock = threading.RLock( |
| 119 | ) # Protect cross-thread access |
| 120 | self._reader_task = None |
| 121 | self._executor = concurrent.futures.ThreadPoolExecutor( |
| 122 | max_workers=num_workers, thread_name_prefix="rpc_client_worker") |
| 123 | |
| 124 | self._server_stopped = False |
| 125 | self._closed = False |
| 126 | self._loop = None |
| 127 | self._loop_thread = None |
| 128 | self._reader_asyncio_task = None # Track the asyncio task for proper cancellation |
| 129 | self._loop_lock = threading.Lock( |
| 130 | ) # Lock to protect event loop initialization |
| 131 | |
| 132 | # Eagerly create the background event loop so that all subsequent |
| 133 | # RPC calls (sync or streaming) can assume it exists. This removes a |
| 134 | # race between the first streaming call (which previously created the |
| 135 | # loop lazily) and immediate fire-and-forget calls like `submit()`. |
| 136 | self._ensure_event_loop() |
| 137 |
no outgoing calls