Wraps a subprocess for testing purposes.
| 12 | |
| 13 | |
| 14 | class TestProcess: |
| 15 | """ |
| 16 | Wraps a subprocess for testing purposes. |
| 17 | """ |
| 18 | |
| 19 | __test__ = False |
| 20 | |
| 21 | def __init__( |
| 22 | self, |
| 23 | command: str, |
| 24 | *, |
| 25 | extra_env: dict[str, str] = {}, |
| 26 | expect_return_code: int | None = 0, |
| 27 | sigint_on_exit: bool = True, |
| 28 | expect_timeout: int = 5, |
| 29 | use_pty: bool = False, |
| 30 | join_timeout: int = 1, |
| 31 | ): |
| 32 | self.proc: subprocess.Popen[str] |
| 33 | self._terminated = False |
| 34 | self._failed = False |
| 35 | |
| 36 | self.expect_return_code = expect_return_code |
| 37 | self.expect_timeout = expect_timeout |
| 38 | self.sigint_on_exit = sigint_on_exit |
| 39 | self.join_timeout = join_timeout |
| 40 | |
| 41 | self.stderr_output: list[str] = [] |
| 42 | self.stdout_output: list[str] = [] |
| 43 | self._stderr_cursor: int = 0 # Used for stateful log matching |
| 44 | self._stdout_cursor: int = 0 |
| 45 | |
| 46 | self.use_pty: bool = use_pty |
| 47 | # Create PTY pair |
| 48 | if self.use_pty: |
| 49 | if IS_WINDOWS: |
| 50 | raise Exception("termios doesn't exist on windows, and thus we cannot import pty") |
| 51 | |
| 52 | import pty |
| 53 | |
| 54 | self.stdin_m, self.stdin_s = pty.openpty() |
| 55 | |
| 56 | self.proc = subprocess.Popen( |
| 57 | shlex.split(command) if not IS_WINDOWS else command.split(" "), |
| 58 | env={ |
| 59 | "PYTHONUNBUFFERED": "1", |
| 60 | **os.environ, |
| 61 | "OTEL_LOGS_EXPORTER": "none", |
| 62 | "OTEL_METRICS_EXPORTER": "none", |
| 63 | "OTEL_TRACES_EXPORTER": "none", |
| 64 | **extra_env, |
| 65 | }, |
| 66 | stdin=self.stdin_s if self.use_pty else None, |
| 67 | stdout=subprocess.PIPE, |
| 68 | stderr=subprocess.PIPE, |
| 69 | text=True, |
| 70 | ) |
| 71 |
no outgoing calls
searching dependent graphs…