MCPcopy
hub / github.com/InternLM/lmdeploy / ApiServerPerTest

Class ApiServerPerTest

autotest/utils/proxy_distributed_utils.py:223–298  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

221
222
223class ApiServerPerTest:
224
225 def __init__(self, proxy_manager: ProxyDistributedManager, config: dict[str, Any], run_config: dict[str, Any]):
226 self.proxy_manager = proxy_manager
227 self.config = config
228 self.run_config = run_config
229
230 model_name = run_config['model']
231 self.model_path = os.path.join(config['model_path'], model_name)
232
233 self.master_addr = proxy_manager.master_addr
234 self.proxy_port = proxy_manager.proxy_port
235 self.node_rank = int(os.getenv('NODE_RANK', '0'))
236 self.node_count = int(os.getenv('NODE_COUNT', '1'))
237 self.proc_per_node = int(os.getenv('PROC_PER_NODE', '1'))
238
239 _pc = run_config.get('parallel_config') or {}
240 _dp = int(_pc.get('dp', 0) or 0)
241 self.expected_instances = _dp if _dp > 1 else 1
242 self.is_master = (self.node_rank == 0)
243 self.api_process = None
244
245 def start(self):
246 proxy_url = f'http://{self.master_addr}:{self.proxy_port}'
247
248 extra_params = self.run_config.get('extra_params', {})
249 resolve_extra_params(extra_params, self.config['model_path'])
250
251 # Get model-name: use extra_params['model-name'] if specified, otherwise use case_name
252 case_name = get_case_str_by_config(self.run_config)
253 self.model_name = case_name if extra_params.get('model-name', None) is None else extra_params.get('model-name')
254
255 cmd = [
256 'lmdeploy',
257 'serve',
258 'api_server',
259 self.model_path,
260 '--model-name',
261 self.model_name,
262 ] + get_cli_common_param(self.run_config).split() + [
263 '--proxy-url',
264 proxy_url,
265 ]
266 if self.node_count > 1:
267 cmd += ['--nnodes', str(self.node_count), '--node-rank', str(self.node_rank)]
268
269 print(f"[API Server] Starting: {' '.join(cmd)}")
270 timestamp = time.strftime('%Y%m%d_%H%M%S')
271 log_dir = self.config.get('server_log_path', '/tmp/lmdeploy_test')
272 os.makedirs(log_dir, exist_ok=True)
273 log_path = os.path.join(log_dir, f'log_{case_name}_{timestamp}.log')
274 self._log_file = open(log_path, 'w')
275 self.api_process = subprocess.Popen(cmd, stdout=self._log_file, stderr=self._log_file)
276 print(f'📝 API Server log: {log_path}')
277
278 def wait_until_ready(self):
279 if not self.is_master:
280 return

Calls

no outgoing calls