| 221 | |
| 222 | |
| 223 | class ApiServerPerTest: |
| 224 | |
| 225 | def __init__(self, proxy_manager: ProxyDistributedManager, config: dict[str, Any], run_config: dict[str, Any]): |
| 226 | self.proxy_manager = proxy_manager |
| 227 | self.config = config |
| 228 | self.run_config = run_config |
| 229 | |
| 230 | model_name = run_config['model'] |
| 231 | self.model_path = os.path.join(config['model_path'], model_name) |
| 232 | |
| 233 | self.master_addr = proxy_manager.master_addr |
| 234 | self.proxy_port = proxy_manager.proxy_port |
| 235 | self.node_rank = int(os.getenv('NODE_RANK', '0')) |
| 236 | self.node_count = int(os.getenv('NODE_COUNT', '1')) |
| 237 | self.proc_per_node = int(os.getenv('PROC_PER_NODE', '1')) |
| 238 | |
| 239 | _pc = run_config.get('parallel_config') or {} |
| 240 | _dp = int(_pc.get('dp', 0) or 0) |
| 241 | self.expected_instances = _dp if _dp > 1 else 1 |
| 242 | self.is_master = (self.node_rank == 0) |
| 243 | self.api_process = None |
| 244 | |
| 245 | def start(self): |
| 246 | proxy_url = f'http://{self.master_addr}:{self.proxy_port}' |
| 247 | |
| 248 | extra_params = self.run_config.get('extra_params', {}) |
| 249 | resolve_extra_params(extra_params, self.config['model_path']) |
| 250 | |
| 251 | # Get model-name: use extra_params['model-name'] if specified, otherwise use case_name |
| 252 | case_name = get_case_str_by_config(self.run_config) |
| 253 | self.model_name = case_name if extra_params.get('model-name', None) is None else extra_params.get('model-name') |
| 254 | |
| 255 | cmd = [ |
| 256 | 'lmdeploy', |
| 257 | 'serve', |
| 258 | 'api_server', |
| 259 | self.model_path, |
| 260 | '--model-name', |
| 261 | self.model_name, |
| 262 | ] + get_cli_common_param(self.run_config).split() + [ |
| 263 | '--proxy-url', |
| 264 | proxy_url, |
| 265 | ] |
| 266 | if self.node_count > 1: |
| 267 | cmd += ['--nnodes', str(self.node_count), '--node-rank', str(self.node_rank)] |
| 268 | |
| 269 | print(f"[API Server] Starting: {' '.join(cmd)}") |
| 270 | timestamp = time.strftime('%Y%m%d_%H%M%S') |
| 271 | log_dir = self.config.get('server_log_path', '/tmp/lmdeploy_test') |
| 272 | os.makedirs(log_dir, exist_ok=True) |
| 273 | log_path = os.path.join(log_dir, f'log_{case_name}_{timestamp}.log') |
| 274 | self._log_file = open(log_path, 'w') |
| 275 | self.api_process = subprocess.Popen(cmd, stdout=self._log_file, stderr=self._log_file) |
| 276 | print(f'📝 API Server log: {log_path}') |
| 277 | |
| 278 | def wait_until_ready(self): |
| 279 | if not self.is_master: |
| 280 | return |
no outgoing calls