Main loop of the engine. Each engine instance would communicate with the engine by queue.
(self)
| 403 | return forward_inputs, next_running |
| 404 | |
| 405 | async def main_loop(self): |
| 406 | """Main loop of the engine. |
| 407 | |
| 408 | Each engine instance would communicate with the engine by queue. |
| 409 | """ |
| 410 | has_runable_event = self.has_runable_event |
| 411 | scheduler = self.scheduler |
| 412 | forward_inputs = None |
| 413 | next_running = None |
| 414 | |
| 415 | async def __no_running_warning(): |
| 416 | # TODO (JimyMa): add watermark check event instead of async sleep. |
| 417 | # self.perfill_watermark_event.wait() |
| 418 | logger.warning(f'no next prefill running request, Maybe cache is full, ' |
| 419 | f'free gpu cache blocks: {scheduler.block_manager.get_num_free_gpu_blocks()}, ' |
| 420 | f'total gpu cache blocks: {scheduler.block_manager.num_gpu_blocks}') |
| 421 | await asyncio.sleep(0.1) |
| 422 | |
| 423 | while not self.stop_event.is_set(): |
| 424 | if self._sleep_requested: |
| 425 | # Drop prefetched work from before sleep. Sleep ends scheduler |
| 426 | # sessions and releases KV cache, so any saved next batch is |
| 427 | # stale after the drain point. |
| 428 | forward_inputs = None |
| 429 | next_running = None |
| 430 | # Acknowledge that no new forward input will be scheduled until |
| 431 | # wakeup resumes this loop. |
| 432 | self._main_sleep_drain_event.set() |
| 433 | await self._sleep_resume_event.wait() |
| 434 | continue |
| 435 | |
| 436 | if next_running is None: |
| 437 | forward_inputs, next_running = await self._main_loop_try_send_next_inputs() |
| 438 | if next_running is None: |
| 439 | if self._sleep_requested: |
| 440 | continue |
| 441 | await __no_running_warning() |
| 442 | continue |
| 443 | |
| 444 | scheduler.activate_seqs(next_running) |
| 445 | forward_inputs, next_running = await self._main_loop_get_outputs( |
| 446 | running=next_running, |
| 447 | forward_inputs=forward_inputs, |
| 448 | ) |
| 449 | self.inputs_maker.deactivate_evict_seqs() |
| 450 | has_runable_event.set() |
| 451 | |
| 452 | def update_running_migration(self, running: 'SeqList', next_token_ids: np.ndarray, stopped: torch.Tensor, |
| 453 | model_metas: list[dict[str, Any]]): |
no test coverage detected