MCPcopy
hub / github.com/NVIDIA/TensorRT-LLM / _call_async

Method _call_async

tensorrt_llm/executor/rpc/rpc_client.py:443–504  ·  view source on GitHub ↗

Async version of RPC call. Args: method_name: Method name to call *args: Positional arguments **kwargs: Keyword arguments __rpc_params: RPCParams object containing RPC parameters. Returns: The result of the remote method ca

(self, method_name, *args, **kwargs)

Source from the content-addressed store, hash-verified

441 raise
442
443 async def _call_async(self, method_name, *args, **kwargs):
444 """Async version of RPC call.
445 Args:
446 method_name: Method name to call
447 *args: Positional arguments
448 **kwargs: Keyword arguments
449 __rpc_params: RPCParams object containing RPC parameters.
450
451 Returns:
452 The result of the remote method call
453 """
454 if enable_llmapi_debug() or logger.level == 'debug':
455 logger_debug(
456 f"[client] [{datetime.now().isoformat()}] RPC client calling method: {method_name}"
457 )
458 nvtx_mark_debug(f"RPC.async.{method_name}",
459 color="yellow",
460 category="RPC")
461 if self._server_stopped:
462 raise RPCCancelled("Server is shutting down, request cancelled")
463
464 rpc_params = kwargs.pop("__rpc_params", RPCParams())
465 need_response = rpc_params.need_response
466 timeout = rpc_params.timeout if rpc_params.timeout is not None else self._timeout
467
468 request_id = uuid.uuid4().hex
469 request = RPCRequest(request_id,
470 method_name=method_name,
471 args=args,
472 kwargs=kwargs,
473 need_response=need_response,
474 timeout=timeout)
475 await self._client_socket.put_async(request)
476 logger_debug(f"[client] RPC Client sent request: {request}")
477
478 if not need_response:
479 return None
480
481 loop = asyncio.get_running_loop()
482 future = loop.create_future()
483 self._pending_futures[request_id] = (future, loop)
484
485 try:
486 # If timeout, the remote call should return a timeout error timely,
487 # so we add 1 second to the timeout to ensure the client can get
488 # that result.
489 if timeout is None:
490 res = await future
491 else:
492 # Add 1 second to the timeout to ensure the client can get
493 res = await asyncio.wait_for(future, timeout)
494 return res
495 except RPCCancelled:
496 self._server_stopped = True
497 raise
498 except asyncio.TimeoutError:
499 raise RPCTimeout(
500 f"Request '{method_name}' timed out after {timeout}s")

Callers 2

_call_syncMethod · 0.95
_async_to_syncMethod · 0.95

Calls 9

nvtx_mark_debugFunction · 0.90
enable_llmapi_debugFunction · 0.85
logger_debugFunction · 0.85
RPCCancelledClass · 0.85
RPCParamsClass · 0.85
RPCRequestClass · 0.85
RPCTimeoutClass · 0.85
popMethod · 0.80
put_asyncMethod · 0.80

Tested by

no test coverage detected