Instance of TurboMind. Args: engine (Engine): engine
| 117 | |
| 118 | |
| 119 | class EngineInstance(EngineInstanceBase): |
| 120 | """Instance of TurboMind. |
| 121 | |
| 122 | Args: |
| 123 | engine (Engine): engine |
| 124 | """ |
| 125 | |
| 126 | def __init__(self, engine: Engine): |
| 127 | self.engine = engine |
| 128 | self.req_sender = engine.req_manager.build_sender() |
| 129 | |
| 130 | self.max_input_len = self.engine.max_session_len |
| 131 | self._enable_transfer_obj_ref = engine.engine_config.enable_transfer_obj_ref and \ |
| 132 | engine.engine_config.distributed_executor_backend == 'ray' |
| 133 | if self._enable_transfer_obj_ref: |
| 134 | _lazy_create_ray_store() |
| 135 | |
| 136 | def __del__(self): |
| 137 | """Destructor.""" |
| 138 | self.engine.req_manager.senders.pop(self.req_sender.sender_id) |
| 139 | |
| 140 | def _get_extra_outputs(self, resp: Response, num_all_ids: int): |
| 141 | """Get extra outputs.""" |
| 142 | outputs = dict(routed_experts=None) |
| 143 | routed_experts = resp.data.get('routed_experts', None) if resp.data else None |
| 144 | if routed_experts is not None and resp.type in [ResponseType.FINISH, ResponseType.CANCEL]: |
| 145 | if self._enable_transfer_obj_ref: |
| 146 | import ray |
| 147 | # validate experts |
| 148 | num_expected_experts = num_all_ids - 1 |
| 149 | if routed_experts.shape[0] != num_expected_experts: |
| 150 | logger.warning(f'Expected number of routed_experts: {num_expected_experts}, ' |
| 151 | f'but got {routed_experts.shape[0]}') |
| 152 | routed_experts = routed_experts[:num_expected_experts] |
| 153 | key = ray.get(_SHARED_STORE.put.remote(routed_experts)) |
| 154 | outputs['routed_experts'] = key |
| 155 | else: |
| 156 | outputs['routed_experts'] = routed_experts |
| 157 | return outputs |
| 158 | |
| 159 | async def _async_try_add_session(self, session_id: int): |
| 160 | """Add new session. |
| 161 | |
| 162 | Args: |
| 163 | session_id (int): The session id to add. |
| 164 | """ |
| 165 | return await async_try_add_session(self.req_sender, session_id) |
| 166 | |
| 167 | def _try_add_session(self, session_id: int): |
| 168 | """Add new session. |
| 169 | |
| 170 | Args: |
| 171 | session_id (int): The session id to add. |
| 172 | """ |
| 173 | return try_add_session(self.req_sender, session_id) |
| 174 | |
| 175 | async def async_stream_infer(self, |
| 176 | session_id: int, |
no outgoing calls