(self)
| 55 | self.messages: SimpleQueue[str] = SimpleQueue() |
| 56 | |
| 57 | def parse(self) -> Generator[None, None, None]: |
| 58 | while True: |
| 59 | sys.stdout.write("> ") |
| 60 | sys.stdout.flush() |
| 61 | line = yield from self.reader.read_line(sys.maxsize) |
| 62 | self.messages.put(line.decode().rstrip("\r\n")) |
| 63 | |
| 64 | def connection_made(self, transport: asyncio.BaseTransport) -> None: |
| 65 | self.parser = self.parse() |
no test coverage detected