Args: worker_kwargs: kwargs for the rpc worker model_world_size: the world size of the model mpi_session: the mpi session to use postproc_worker_config: the postproc worker config is_llm_executor: whether this is an llm executor
(
self,
worker_kwargs: dict,
model_world_size: int = 1,
mpi_session: Optional[MpiSession] = None,
*,
postproc_worker_config: Optional[PostprocWorkerConfig] = None,
is_llm_executor: Optional[bool] = None,
)
| 18 | INSTANCE_COUNTER = 0 |
| 19 | |
| 20 | def __init__( |
| 21 | self, |
| 22 | worker_kwargs: dict, |
| 23 | model_world_size: int = 1, |
| 24 | mpi_session: Optional[MpiSession] = None, |
| 25 | *, |
| 26 | postproc_worker_config: Optional[PostprocWorkerConfig] = None, |
| 27 | is_llm_executor: Optional[bool] = None, |
| 28 | ): |
| 29 | """ |
| 30 | Args: |
| 31 | worker_kwargs: kwargs for the rpc worker |
| 32 | model_world_size: the world size of the model |
| 33 | mpi_session: the mpi session to use |
| 34 | postproc_worker_config: the postproc worker config |
| 35 | is_llm_executor: whether this is an llm executor |
| 36 | """ |
| 37 | GenerationExecutorRpcProxy.INSTANCE_COUNTER += 1 |
| 38 | self.init_rpc_executor() |
| 39 | |
| 40 | postproc_worker_config = postproc_worker_config or PostprocWorkerConfig( |
| 41 | ) |
| 42 | |
| 43 | super().__init__( |
| 44 | num_postprocess_workers=postproc_worker_config. |
| 45 | num_postprocess_workers, |
| 46 | postprocess_tokenizer_dir=postproc_worker_config. |
| 47 | postprocess_tokenizer_dir, |
| 48 | is_llm_executor=is_llm_executor, |
| 49 | ) |
| 50 | |
| 51 | self._create_mpi_session(model_world_size, mpi_session) |
| 52 | |
| 53 | # Inject the generated HMAC key into worker_kwargs for workers |
| 54 | worker_kwargs['hmac_key'] = self.hmac_key |
| 55 | self.worker_kwargs = worker_kwargs |
| 56 | |
| 57 | self.launch_workers() |
| 58 | |
| 59 | # Invoke model creation on the remote |
| 60 | # TBD: Move model creation to the mpi task, or left in RPC? |
| 61 | self.setup_engine_remote() |
| 62 | |
| 63 | # Setup main loop after engine is ready |
| 64 | self._setup_mainloop_with_tasks() |
| 65 | |
| 66 | def launch_workers(self): |
| 67 | logger.debug(f"Launching workers") |
nothing calls this directly
no test coverage detected