(self)
| 189 | self.exit_code = 1 |
| 190 | |
| 191 | def execute(self) -> bool: |
| 192 | import pty |
| 193 | |
| 194 | if (old_dir := os.getcwd()) != self.working_directory: |
| 195 | os.chdir(str(self.working_directory)) |
| 196 | |
| 197 | # Note: If for any reason, we get a Python exception between here |
| 198 | # and until os.close(), the traceback will get locked inside |
| 199 | # stdout of the child_fd object. `os.read(self.child_fd, 8192)` is the |
| 200 | # only way to get the traceback without losing it. |
| 201 | |
| 202 | self.pid, self.child_fd = pty.fork() |
| 203 | |
| 204 | # https://stackoverflow.com/questions/4022600/python-pty-fork-how-does-it-work |
| 205 | if not self.pid: |
| 206 | _cmd_history(self.cmd) |
| 207 | |
| 208 | try: |
| 209 | os.execve(self.cmd[0], list(self.cmd), {**os.environ, **self.environment_vars}) |
| 210 | except FileNotFoundError: |
| 211 | error(f'{self.cmd[0]} does not exist.') |
| 212 | self.exit_code = 1 |
| 213 | return False |
| 214 | else: |
| 215 | # Only parent process moves back to the original working directory |
| 216 | os.chdir(old_dir) |
| 217 | |
| 218 | self.started = True |
| 219 | self.poll_object.register(self.child_fd, EPOLLIN | EPOLLHUP) |
| 220 | |
| 221 | return True |
| 222 | |
| 223 | def decode(self, encoding: str = 'UTF-8') -> str: |
| 224 | return self._trace_log.decode(encoding) |
no test coverage detected