Universal distributed test executor (using shared Ray cluster)
(
config,
run_config,
worker_id,
test_type='infer',
manager=None, # ← New parameter: pass in shared manager
eval_config_name='default')
| 17 | |
| 18 | |
| 19 | def _run_ray_distributed_test( |
| 20 | config, |
| 21 | run_config, |
| 22 | worker_id, |
| 23 | test_type='infer', |
| 24 | manager=None, # ← New parameter: pass in shared manager |
| 25 | eval_config_name='default'): |
| 26 | """Universal distributed test executor (using shared Ray cluster)""" |
| 27 | assert manager is not None, 'Manager instance must be provided' |
| 28 | eval_config_name = resolve_eval_config_name(config, run_config, eval_config_name) |
| 29 | |
| 30 | preset_config = get_eval_preset_config(config, run_config, eval_config_name) |
| 31 | |
| 32 | if manager.is_master: |
| 33 | model_path = os.path.join(config['model_path'], run_config['model']) |
| 34 | eval_path = config.get('eval_path') |
| 35 | |
| 36 | # Start API Server for current model (master node starts/stops, worker nodes verify) |
| 37 | manager.start_lmdeploy_api_server(config=config, run_config=run_config) |
| 38 | |
| 39 | try: |
| 40 | print(f'🧪 Master node executing {test_type} test ({eval_config_name})...') |
| 41 | case_name = get_case_str_by_config(run_config) |
| 42 | |
| 43 | result, msg = eval_test(model_path, |
| 44 | eval_path, |
| 45 | case_name, |
| 46 | port=constant.PROXY_PORT, |
| 47 | test_type=test_type, |
| 48 | **preset_config) |
| 49 | assert result, f'❌ {test_type} test failed: {msg}' |
| 50 | print(f'✅ {test_type} test passed') |
| 51 | |
| 52 | finally: |
| 53 | # Clean up API Server for current model (worker nodes skip) |
| 54 | manager.cleanup(force=False) |
| 55 | else: |
| 56 | time.sleep(10) |
| 57 | ray_worker_node_wait(manager, timeout_minutes=4880) |
| 58 | |
| 59 | |
| 60 | def _run_proxy_distributed_test(config, |
no test coverage detected