Worker node waits for Ray master node (Head Node) to be alive (by detecting GCS service port)
(manager: RayLMDeployManager, timeout_minutes: int = 60)
| 271 | |
| 272 | |
| 273 | def ray_worker_node_wait(manager: RayLMDeployManager, timeout_minutes: int = 60): |
| 274 | """Worker node waits for Ray master node (Head Node) to be alive (by |
| 275 | detecting GCS service port)""" |
| 276 | if manager.is_master: |
| 277 | return |
| 278 | |
| 279 | print(f'⏸️ Worker node {manager.node_rank} entering wait mode...') |
| 280 | max_checks = (timeout_minutes * 60) // WORKER_WAIT_INTERVAL |
| 281 | consecutive_failures = 0 |
| 282 | max_consecutive_failures = 3 |
| 283 | |
| 284 | for i in range(max_checks): |
| 285 | try: |
| 286 | with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: |
| 287 | sock.settimeout(CONNECTION_CHECK_TIMEOUT) |
| 288 | if sock.connect_ex((manager.master_addr, RAY_PORT)) == 0: |
| 289 | consecutive_failures = 0 |
| 290 | else: |
| 291 | consecutive_failures += 1 |
| 292 | except Exception: |
| 293 | consecutive_failures += 1 |
| 294 | |
| 295 | if consecutive_failures >= max_consecutive_failures: |
| 296 | print('📡 Ray master node GCS service unreachable, worker node exiting') |
| 297 | break |
| 298 | |
| 299 | if i % 4 == 0: |
| 300 | elapsed = (i * WORKER_WAIT_INTERVAL) // 60 |
| 301 | print(f'⏳ Worker node {manager.node_rank} waiting... Running for {elapsed} minutes') |
| 302 | |
| 303 | time.sleep(WORKER_WAIT_INTERVAL) |
| 304 | else: |
| 305 | print(f'⏰ Worker node {manager.node_rank} wait timeout ({timeout_minutes} minutes)') |
| 306 | |
| 307 | manager.cleanup() |