MCPcopy Index your code
hub / github.com/ray-project/ray / SubprocessModuleHandle

Class SubprocessModuleHandle

python/ray/dashboard/subprocesses/handle.py:56–383  ·  view source on GitHub ↗

A handle to a module created as a subprocess. Can send messages to the module and receive responses. It only acts as a proxy to the aiohttp server running in the subprocess. On destruction, the subprocess is terminated. Lifecycle: 1. In SubprocessModuleHandle creation, the subp

Source from the content-addressed store, hash-verified

54
55
56class SubprocessModuleHandle:
57 """
58 A handle to a module created as a subprocess. Can send messages to the module and
59 receive responses. It only acts as a proxy to the aiohttp server running in the
60 subprocess. On destruction, the subprocess is terminated.
61
62 Lifecycle:
63 1. In SubprocessModuleHandle creation, the subprocess is started and runs an aiohttp
64 server.
65 2. User must call start_module() and wait_for_module_ready() first.
66 3. SubprocessRouteTable.bind(handle)
67 4. app.add_routes(routes=SubprocessRouteTable.bound_routes())
68 5. Run the app.
69
70 Health check (_do_periodic_health_check):
71 Every 1s, do a health check by _do_once_health_check. If the module is
72 unhealthy:
73 1. log the exception
74 2. log the last N lines of the log file
75 3. fail all active requests
76 4. restart the module
77
78 TODO(ryw): define policy for health check:
79 - check period (Now: 1s)
80 - define unhealthy. (Now: process exits. TODO: check_health() for event loop hang)
81 - check number of failures in a row before we deem it unhealthy (Now: N/A)
82 - "max number of restarts"? (Now: infinite)
83 """
84
85 # Class variable. Force using spawn because Ray C bindings have static variables
86 # that need to be re-initialized for a new process.
87 mp_context = multiprocessing.get_context("spawn")
88
89 def __init__(
90 self,
91 loop: asyncio.AbstractEventLoop,
92 module_cls: type[SubprocessModule],
93 config: SubprocessModuleConfig,
94 ):
95 self.loop = loop
96 self.module_cls = module_cls
97 self.config = config
98
99 # Increment this when the module is restarted.
100 self.incarnation = 0
101 # Runtime states, set by start_module() and wait_for_module_ready(),
102 # reset by destroy_module().
103 self.parent_conn = None
104 self.process = None
105 self.http_client_session: Optional[aiohttp.ClientSession] = None
106 self.health_check_task = None
107
108 def str_for_state(self, incarnation: int, pid: Optional[int]):
109 return f"SubprocessModuleHandle(module_cls={self.module_cls.__name__}, incarnation={incarnation}, pid={pid})"
110
111 def __str__(self):
112 return self.str_for_state(
113 self.incarnation, self.process.pid if self.process else None

Calls 1

get_contextMethod · 0.45

Tested by 3

start_http_server_appFunction · 0.72

Used in the wild real call sites across dependent graphs

searching dependent graphs…