(self)
| 162 | return True |
| 163 | |
| 164 | def poll(self) -> None: |
| 165 | self.make_sure_we_are_executing() |
| 166 | |
| 167 | if self.child_fd: |
| 168 | got_output = False |
| 169 | for _fileno, _event in self.poll_object.poll(0.1): |
| 170 | try: |
| 171 | output = os.read(self.child_fd, 8192) |
| 172 | got_output = True |
| 173 | self.peak(output) |
| 174 | self._trace_log += output |
| 175 | except OSError: |
| 176 | self.ended = True |
| 177 | break |
| 178 | |
| 179 | if self.ended or (not got_output and not _pid_exists(self.pid)): |
| 180 | self.ended = True |
| 181 | try: |
| 182 | wait_status = os.waitpid(self.pid, 0)[1] |
| 183 | self.exit_code = os.waitstatus_to_exitcode(wait_status) |
| 184 | except ChildProcessError: |
| 185 | try: |
| 186 | wait_status = os.waitpid(self.child_fd, 0)[1] |
| 187 | self.exit_code = os.waitstatus_to_exitcode(wait_status) |
| 188 | except ChildProcessError: |
| 189 | self.exit_code = 1 |
| 190 | |
| 191 | def execute(self) -> bool: |
| 192 | import pty |
no test coverage detected