Async version of RPC call. Args: method_name: Method name to call *args: Positional arguments **kwargs: Keyword arguments __rpc_params: RPCParams object containing RPC parameters. Returns: The result of the remote method ca
(self, method_name, *args, **kwargs)
| 441 | raise |
| 442 | |
| 443 | async def _call_async(self, method_name, *args, **kwargs): |
| 444 | """Async version of RPC call. |
| 445 | Args: |
| 446 | method_name: Method name to call |
| 447 | *args: Positional arguments |
| 448 | **kwargs: Keyword arguments |
| 449 | __rpc_params: RPCParams object containing RPC parameters. |
| 450 | |
| 451 | Returns: |
| 452 | The result of the remote method call |
| 453 | """ |
| 454 | if enable_llmapi_debug() or logger.level == 'debug': |
| 455 | logger_debug( |
| 456 | f"[client] [{datetime.now().isoformat()}] RPC client calling method: {method_name}" |
| 457 | ) |
| 458 | nvtx_mark_debug(f"RPC.async.{method_name}", |
| 459 | color="yellow", |
| 460 | category="RPC") |
| 461 | if self._server_stopped: |
| 462 | raise RPCCancelled("Server is shutting down, request cancelled") |
| 463 | |
| 464 | rpc_params = kwargs.pop("__rpc_params", RPCParams()) |
| 465 | need_response = rpc_params.need_response |
| 466 | timeout = rpc_params.timeout if rpc_params.timeout is not None else self._timeout |
| 467 | |
| 468 | request_id = uuid.uuid4().hex |
| 469 | request = RPCRequest(request_id, |
| 470 | method_name=method_name, |
| 471 | args=args, |
| 472 | kwargs=kwargs, |
| 473 | need_response=need_response, |
| 474 | timeout=timeout) |
| 475 | await self._client_socket.put_async(request) |
| 476 | logger_debug(f"[client] RPC Client sent request: {request}") |
| 477 | |
| 478 | if not need_response: |
| 479 | return None |
| 480 | |
| 481 | loop = asyncio.get_running_loop() |
| 482 | future = loop.create_future() |
| 483 | self._pending_futures[request_id] = (future, loop) |
| 484 | |
| 485 | try: |
| 486 | # If timeout, the remote call should return a timeout error timely, |
| 487 | # so we add 1 second to the timeout to ensure the client can get |
| 488 | # that result. |
| 489 | if timeout is None: |
| 490 | res = await future |
| 491 | else: |
| 492 | # Add 1 second to the timeout to ensure the client can get |
| 493 | res = await asyncio.wait_for(future, timeout) |
| 494 | return res |
| 495 | except RPCCancelled: |
| 496 | self._server_stopped = True |
| 497 | raise |
| 498 | except asyncio.TimeoutError: |
| 499 | raise RPCTimeout( |
| 500 | f"Request '{method_name}' timed out after {timeout}s") |
no test coverage detected