| 63 | |
| 64 | |
| 65 | class _WriteStream(_Stream): |
| 66 | def __init__( |
| 67 | self, |
| 68 | write_handler: Callable[[str], int | None], |
| 69 | name: str, |
| 70 | encoding: str = "utf-8", |
| 71 | errors: str = "strict", |
| 72 | ): |
| 73 | super().__init__(name, encoding, errors) |
| 74 | self._write_handler = write_handler |
| 75 | |
| 76 | def writable(self) -> bool: |
| 77 | return True |
| 78 | |
| 79 | def write(self, s: str) -> int: |
| 80 | if self.closed: |
| 81 | raise ValueError("write to closed file") |
| 82 | s = str.encode(s, self.encoding, self.errors).decode(self.encoding, self.errors) |
| 83 | written = self._write_handler(s) |
| 84 | if written is None: |
| 85 | # They didn't tell us how much they wrote, assume it was the whole string |
| 86 | return len(s) |
| 87 | return written |
| 88 | |
| 89 | |
| 90 | class _ReadStream(_Stream): |
no outgoing calls
no test coverage detected
searching dependent graphs…