MCPcopy
hub / github.com/locustio/locust / close

Method close

locust/test/subprocess_utils.py:95–118  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

93 self.close()
94
95 def close(self) -> None:
96 __tracebackhide__ = True
97
98 if self.use_pty:
99 os.close(self.stdin_m)
100 os.close(self.stdin_s)
101
102 try:
103 if self.sigint_on_exit and not self._terminated:
104 self.terminate()
105 proc_return_code = self.proc.wait(timeout=self.join_timeout)
106
107 # Locust does not perform a graceful shutdown on Windows since we send SIGTERM
108 if not IS_WINDOWS and self.expect_return_code is not None and proc_return_code != self.expect_return_code:
109 self.on_fail(
110 f"Process exited with return code {proc_return_code}. Expected {self.expect_return_code} ({proc_return_code} != {self.expect_return_code})"
111 )
112 except subprocess.TimeoutExpired:
113 self.proc.kill()
114 self.proc.wait()
115 self.on_fail(f"Process took more than {self.join_timeout} seconds to terminate.")
116
117 self.stdout_reader.join(timeout=self.join_timeout)
118 self.stderr_reader.join(timeout=self.join_timeout)
119
120 # Check output logs from last found (stateful)
121 def expect(self, to_expect, *, stream="stderr"):

Callers 9

test_json_schemaMethod · 0.95
__exit__Method · 0.95
tearDownMethod · 0.45
test_client_retryMethod · 0.45
test_rpc_errorMethod · 0.45
temporary_fileFunction · 0.45
get_free_tcp_portFunction · 0.45

Calls 3

terminateMethod · 0.95
on_failMethod · 0.95
waitMethod · 0.45

Tested by 6

test_json_schemaMethod · 0.76
tearDownMethod · 0.36
test_client_retryMethod · 0.36
test_rpc_errorMethod · 0.36