(self)
| 501 | |
| 502 | class TestProcessOutput(unittest.TestCase): |
| 503 | def test_log_raw_single_line(self): |
| 504 | config = self._create_config() |
| 505 | self.create_and_start_executor(config) |
| 506 | |
| 507 | observer = _StoringObserver() |
| 508 | self.executor.get_raw_output_stream().subscribe(observer) |
| 509 | |
| 510 | self.write_process_output('some text') |
| 511 | |
| 512 | wait_buffer_flush() |
| 513 | |
| 514 | self.assertEqual(['some text'], observer.data) |
| 515 | |
| 516 | def test_log_raw_single_buffer(self): |
| 517 | config = self._create_config() |
nothing calls this directly
no test coverage detected