MCPcopy
hub / github.com/locustio/locust / TestProcess

Class TestProcess

locust/test/subprocess_utils.py:14–183  ·  view source on GitHub ↗

Wraps a subprocess for testing purposes.

Source from the content-addressed store, hash-verified

12
13
14class TestProcess:
15 """
16 Wraps a subprocess for testing purposes.
17 """
18
19 __test__ = False
20
21 def __init__(
22 self,
23 command: str,
24 *,
25 extra_env: dict[str, str] = {},
26 expect_return_code: int | None = 0,
27 sigint_on_exit: bool = True,
28 expect_timeout: int = 5,
29 use_pty: bool = False,
30 join_timeout: int = 1,
31 ):
32 self.proc: subprocess.Popen[str]
33 self._terminated = False
34 self._failed = False
35
36 self.expect_return_code = expect_return_code
37 self.expect_timeout = expect_timeout
38 self.sigint_on_exit = sigint_on_exit
39 self.join_timeout = join_timeout
40
41 self.stderr_output: list[str] = []
42 self.stdout_output: list[str] = []
43 self._stderr_cursor: int = 0 # Used for stateful log matching
44 self._stdout_cursor: int = 0
45
46 self.use_pty: bool = use_pty
47 # Create PTY pair
48 if self.use_pty:
49 if IS_WINDOWS:
50 raise Exception("termios doesn't exist on windows, and thus we cannot import pty")
51
52 import pty
53
54 self.stdin_m, self.stdin_s = pty.openpty()
55
56 self.proc = subprocess.Popen(
57 shlex.split(command) if not IS_WINDOWS else command.split(" "),
58 env={
59 "PYTHONUNBUFFERED": "1",
60 **os.environ,
61 "OTEL_LOGS_EXPORTER": "none",
62 "OTEL_METRICS_EXPORTER": "none",
63 "OTEL_TRACES_EXPORTER": "none",
64 **extra_env,
65 },
66 stdin=self.stdin_s if self.use_pty else None,
67 stdout=subprocess.PIPE,
68 stderr=subprocess.PIPE,
69 text=True,
70 )
71

Calls

no outgoing calls

Used in the wild real call sites across dependent graphs

searching dependent graphs…