MCPcopy Index your code
hub / github.com/pathwaycom/pathway / wait_for_process_handles

Function wait_for_process_handles

python/pathway/cli.py:145–178  ·  view source on GitHub ↗
(
    process_handles: list[subprocess.Popen],
    timeout: float,
)

Source from the content-addressed store, hash-verified

143
144
145def wait_for_process_handles(
146 process_handles: list[subprocess.Popen],
147 timeout: float,
148) -> ProcessHandlesState:
149 result = ProcessHandlesState()
150 for _ in range(2):
151 # A process may finish by requesting scaling, but do so after we have already
152 # checked its exit status. At the same time, another process may terminate with
153 # exit code 1, indicating a failure to communicate with the process that requested
154 # scaling. In that case, only the latter exit code would be observed, causing the
155 # program to terminate due to an error.
156 #
157 # To prevent this, once an error is detected, we must first verify that no scaling
158 # request was issued by the second pass.
159
160 for handle in process_handles:
161 try:
162 maybe_exit_code = handle.wait(timeout)
163 except subprocess.TimeoutExpired:
164 result.has_working_process = True
165 continue
166 if maybe_exit_code == EXIT_CODE_DOWNSCALE:
167 result.needs_downscaling = True
168 elif maybe_exit_code == EXIT_CODE_UPSCALE:
169 result.needs_upscaling = True
170 elif maybe_exit_code != 0:
171 result.has_process_with_error = True
172
173 if not result.has_process_with_error:
174 # If there is no process with an error, then a retry described above is redundant
175 # and can be avoided.
176 break
177
178 return result
179
180
181def terminate_process_handles(process_handles: list[subprocess.Popen]):

Callers 2

wait_for_scaling_eventFunction · 0.90
spawn_programFunction · 0.85

Calls 1

ProcessHandlesStateClass · 0.85

Tested by 1

wait_for_scaling_eventFunction · 0.72