(self,
req_manager: 'RequestManager',
scheduler: 'Scheduler',
executor: 'ExecutorBase',
seq_strategy: 'SequenceStrategy',
inputs_maker: 'InputsMakerAsync',
config: EngineLoopConfig,
engine_conn: Optional['EngineP2PConnection'] = None)
| 107 | """Engine loop manager should be created in an async context.""" |
| 108 | |
| 109 | def __init__(self, |
| 110 | req_manager: 'RequestManager', |
| 111 | scheduler: 'Scheduler', |
| 112 | executor: 'ExecutorBase', |
| 113 | seq_strategy: 'SequenceStrategy', |
| 114 | inputs_maker: 'InputsMakerAsync', |
| 115 | config: EngineLoopConfig, |
| 116 | engine_conn: Optional['EngineP2PConnection'] = None): |
| 117 | self.req_manager = req_manager |
| 118 | self.scheduler = scheduler |
| 119 | self.executor = executor |
| 120 | self.seq_strategy = seq_strategy |
| 121 | self.inputs_maker = inputs_maker |
| 122 | self.config = config |
| 123 | self.engine_conn = engine_conn |
| 124 | |
| 125 | # tasks and control events |
| 126 | self.tasks: set[asyncio.Task] = set() |
| 127 | self.stop_event = asyncio.Event() |
| 128 | self.resp_queue = asyncio.Queue() |
| 129 | self.forward_event = CounterEvent() |
| 130 | self.migration_event = asyncio.Event() |
| 131 | self.has_runable_event = RunableEventAsync(self.scheduler) |
| 132 | # Sleep uses a small handshake with the scheduling loops: |
| 133 | # 1. sleep() sets _sleep_requested and waits for main/migration drain events. |
| 134 | # 2. main_loop and migration_loop reach safe boundaries, acknowledge |
| 135 | # drain, then wait on _sleep_resume_event. |
| 136 | # 3. wakeup() sets _sleep_resume_event so scheduling can continue. |
| 137 | self._sleep_requested = False |
| 138 | self._main_sleep_drain_event = asyncio.Event() |
| 139 | self._main_sleep_drain_event.set() |
| 140 | self._migration_sleep_drain_event = asyncio.Event() |
| 141 | self._migration_sleep_drain_event.set() |
| 142 | self._sleep_resume_event = asyncio.Event() |
| 143 | self._sleep_resume_event.set() |
| 144 | |
| 145 | # check init |
| 146 | if self.config.role != EngineRole.Hybrid: |
| 147 | assert self.engine_conn is not None, 'Engine connection must be provided for non-hybrid engine role.' |
| 148 | |
| 149 | async def preprocess_loop(self): |
| 150 | """Preprocess request.""" |
no test coverage detected