MCPcopy
hub / github.com/NVIDIA/TensorRT-LLM / RpcWorker

Class RpcWorker

tensorrt_llm/executor/rpc_worker.py:24–179  ·  view source on GitHub ↗

A RPC wrapper for the BaseWorker class. Actions: - `setup_engine`: Setup the engine. - `submit`: Submit a request to the worker. - `fetch_responses`: Fetch the latest responses from engine. - `fetch_stats`: Fetch the latest stats from engine. - `fetc

Source from the content-addressed store, hash-verified

22
23
24class RpcWorker(RpcWorkerMixin, BaseWorker):
25 """
26 A RPC wrapper for the BaseWorker class.
27
28 Actions:
29 - `setup_engine`: Setup the engine.
30 - `submit`: Submit a request to the worker.
31 - `fetch_responses`: Fetch the latest responses from engine.
32 - `fetch_stats`: Fetch the latest stats from engine.
33 - `fetch_kv_cache_events`: Fetch the latest kv cache events from engine.
34 - `shutdown`: Shutdown the worker.
35 """
36
37 # Default number of RPC server workers
38 # Increased to handle concurrent requests and prevent thread pool exhaustion
39 # Need enough workers for: submit requests + fetch_responses + other operations
40 # Can be overridden via constructor parameter
41 DEFAULT_NUM_WORKERS = 32
42
43 # Default timeout for fetch_responses in seconds
44 # This is a short timeout to prevent blocking the event loop while still allowing
45 # responses to be fetched efficiently. The value is tuned to balance responsiveness
46 # and CPU usage. Can be overridden via constructor parameter.
47 DEFAULT_FETCH_TIMEOUT = 0.1
48
49 def __init__(
50 self,
51 engine: Union[Path, Engine],
52 executor_config: Optional[tllm.ExecutorConfig] = None,
53 is_llm_executor: Optional[bool] = None,
54 batched_logits_processor: Optional[BatchedLogitsProcessor] = None,
55 postproc_worker_config: Optional[PostprocWorkerConfig] = None,
56 hf_model_dir: Optional[Path] = None,
57 tokenizer: Optional[TokenizerBase] = None,
58 llm_args: Optional[BaseLlmArgs] = None,
59 num_workers: Optional[int] = None,
60 fetch_timeout: Optional[float] = None,
61 ) -> None:
62 super().__init__(
63 engine=engine,
64 executor_config=executor_config,
65 batched_logits_processor=batched_logits_processor,
66 postproc_worker_config=postproc_worker_config,
67 is_llm_executor=is_llm_executor,
68 hf_model_dir=hf_model_dir,
69 tokenizer=tokenizer,
70 llm_args=llm_args,
71 )
72
73 # Configure number of RPC workers
74 self.num_workers = num_workers if num_workers is not None else self.DEFAULT_NUM_WORKERS
75
76 # Configure fetch timeout
77 self._fetch_timeout = fetch_timeout if fetch_timeout is not None else self.DEFAULT_FETCH_TIMEOUT
78
79 # Extract garbage_collection_gen0_threshold from llm_args if available
80 self.garbage_collection_gen0_threshold = (
81 llm_args.garbage_collection_gen0_threshold if llm_args is not None

Callers 2

setup_methodMethod · 0.90
main_taskMethod · 0.85

Calls

no outgoing calls

Tested by 1

setup_methodMethod · 0.72