MCPcopy
hub / github.com/InternLM/lmdeploy / ray_worker_node_wait

Function ray_worker_node_wait

autotest/utils/ray_distributed_utils.py:273–307  ·  view source on GitHub ↗

Worker node waits for Ray master node (Head Node) to be alive (by detecting GCS service port)

(manager: RayLMDeployManager, timeout_minutes: int = 60)

Source from the content-addressed store, hash-verified

271
272
273def ray_worker_node_wait(manager: RayLMDeployManager, timeout_minutes: int = 60):
274 """Worker node waits for Ray master node (Head Node) to be alive (by
275 detecting GCS service port)"""
276 if manager.is_master:
277 return
278
279 print(f'⏸️ Worker node {manager.node_rank} entering wait mode...')
280 max_checks = (timeout_minutes * 60) // WORKER_WAIT_INTERVAL
281 consecutive_failures = 0
282 max_consecutive_failures = 3
283
284 for i in range(max_checks):
285 try:
286 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
287 sock.settimeout(CONNECTION_CHECK_TIMEOUT)
288 if sock.connect_ex((manager.master_addr, RAY_PORT)) == 0:
289 consecutive_failures = 0
290 else:
291 consecutive_failures += 1
292 except Exception:
293 consecutive_failures += 1
294
295 if consecutive_failures >= max_consecutive_failures:
296 print('📡 Ray master node GCS service unreachable, worker node exiting')
297 break
298
299 if i % 4 == 0:
300 elapsed = (i * WORKER_WAIT_INTERVAL) // 60
301 print(f'⏳ Worker node {manager.node_rank} waiting... Running for {elapsed} minutes')
302
303 time.sleep(WORKER_WAIT_INTERVAL)
304 else:
305 print(f'⏰ Worker node {manager.node_rank} wait timeout ({timeout_minutes} minutes)')
306
307 manager.cleanup()

Callers 2

Calls 2

sleepMethod · 0.45
cleanupMethod · 0.45

Tested by 2