Worker node waits by periodically checking if the master's proxy service is still alive. If the proxy becomes unreachable for several consecutive checks, assume master has finished. Args: manager: ProxyDistributedManager instance timeout_minutes: Maximum time to wait bef
(manager, timeout_minutes: int = 120)
| 152 | |
| 153 | |
| 154 | def proxy_worker_node_wait(manager, timeout_minutes: int = 120): |
| 155 | """Worker node waits by periodically checking if the master's proxy service |
| 156 | is still alive. If the proxy becomes unreachable for several consecutive |
| 157 | checks, assume master has finished. |
| 158 | |
| 159 | Args: |
| 160 | manager: ProxyDistributedManager instance |
| 161 | timeout_minutes: Maximum time to wait before giving up (default: 120 minutes) |
| 162 | """ |
| 163 | print(f'⏸️ Worker node {manager.node_rank} entering monitoring mode...') |
| 164 | |
| 165 | max_checks = (timeout_minutes * 60) // WORKER_WAIT_INTERVAL |
| 166 | consecutive_failures = 0 |
| 167 | max_consecutive_failures = 3 |
| 168 | |
| 169 | for i in range(max_checks): |
| 170 | if not is_port_open(manager.master_addr, manager.proxy_port, timeout=2.0): |
| 171 | consecutive_failures += 1 |
| 172 | print(f'⚠️ Proxy connection to master failed ({consecutive_failures}/{max_consecutive_failures})') |
| 173 | if consecutive_failures >= max_consecutive_failures: |
| 174 | print('📡 Master proxy service stopped, worker node exiting') |
| 175 | break |
| 176 | else: |
| 177 | consecutive_failures = 0 |
| 178 | |
| 179 | if i % 4 == 0: |
| 180 | elapsed = (i * WORKER_WAIT_INTERVAL) // 60 |
| 181 | print(f'⏳ Worker node {manager.node_rank} monitoring... Running for {elapsed} minutes') |
| 182 | |
| 183 | time.sleep(WORKER_WAIT_INTERVAL) |
| 184 | else: |
| 185 | print(f'⏰ Worker node {manager.node_rank} monitoring timed out ({timeout_minutes} minutes)') |
| 186 | |
| 187 | print(f'✅ Worker node {manager.node_rank} completed waiting') |
| 188 | |
| 189 | |
| 190 | class ProxyDistributedManager: |