A handle to a module created as a subprocess. Can send messages to the module and receive responses. It only acts as a proxy to the aiohttp server running in the subprocess. On destruction, the subprocess is terminated. Lifecycle: 1. In SubprocessModuleHandle creation, the subp
| 54 | |
| 55 | |
| 56 | class SubprocessModuleHandle: |
| 57 | """ |
| 58 | A handle to a module created as a subprocess. Can send messages to the module and |
| 59 | receive responses. It only acts as a proxy to the aiohttp server running in the |
| 60 | subprocess. On destruction, the subprocess is terminated. |
| 61 | |
| 62 | Lifecycle: |
| 63 | 1. In SubprocessModuleHandle creation, the subprocess is started and runs an aiohttp |
| 64 | server. |
| 65 | 2. User must call start_module() and wait_for_module_ready() first. |
| 66 | 3. SubprocessRouteTable.bind(handle) |
| 67 | 4. app.add_routes(routes=SubprocessRouteTable.bound_routes()) |
| 68 | 5. Run the app. |
| 69 | |
| 70 | Health check (_do_periodic_health_check): |
| 71 | Every 1s, do a health check by _do_once_health_check. If the module is |
| 72 | unhealthy: |
| 73 | 1. log the exception |
| 74 | 2. log the last N lines of the log file |
| 75 | 3. fail all active requests |
| 76 | 4. restart the module |
| 77 | |
| 78 | TODO(ryw): define policy for health check: |
| 79 | - check period (Now: 1s) |
| 80 | - define unhealthy. (Now: process exits. TODO: check_health() for event loop hang) |
| 81 | - check number of failures in a row before we deem it unhealthy (Now: N/A) |
| 82 | - "max number of restarts"? (Now: infinite) |
| 83 | """ |
| 84 | |
| 85 | # Class variable. Force using spawn because Ray C bindings have static variables |
| 86 | # that need to be re-initialized for a new process. |
| 87 | mp_context = multiprocessing.get_context("spawn") |
| 88 | |
| 89 | def __init__( |
| 90 | self, |
| 91 | loop: asyncio.AbstractEventLoop, |
| 92 | module_cls: type[SubprocessModule], |
| 93 | config: SubprocessModuleConfig, |
| 94 | ): |
| 95 | self.loop = loop |
| 96 | self.module_cls = module_cls |
| 97 | self.config = config |
| 98 | |
| 99 | # Increment this when the module is restarted. |
| 100 | self.incarnation = 0 |
| 101 | # Runtime states, set by start_module() and wait_for_module_ready(), |
| 102 | # reset by destroy_module(). |
| 103 | self.parent_conn = None |
| 104 | self.process = None |
| 105 | self.http_client_session: Optional[aiohttp.ClientSession] = None |
| 106 | self.health_check_task = None |
| 107 | |
| 108 | def str_for_state(self, incarnation: int, pid: Optional[int]): |
| 109 | return f"SubprocessModuleHandle(module_cls={self.module_cls.__name__}, incarnation={incarnation}, pid={pid})" |
| 110 | |
| 111 | def __str__(self): |
| 112 | return self.str_for_state( |
| 113 | self.incarnation, self.process.pid if self.process else None |
searching dependent graphs…