(self, size: int = 2048)
| 203 | return self.__banner, self.__header, e |
| 204 | |
| 205 | def recv(self, size: int = 2048) -> Tuple[int, Optional[str]]: |
| 206 | if self.__sock is None: |
| 207 | return -1, 'not connected' |
| 208 | try: |
| 209 | data = self.__sock.recv(size) |
| 210 | except socket.timeout: |
| 211 | return -1, 'timed out' |
| 212 | except socket.error as e: |
| 213 | if e.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK): |
| 214 | return 0, 'retry' |
| 215 | return -1, str(e.args[-1]) |
| 216 | if len(data) == 0: |
| 217 | return -1, None |
| 218 | pos = self._buf.tell() |
| 219 | self._buf.seek(0, 2) |
| 220 | self._buf.write(data) |
| 221 | self._len += len(data) |
| 222 | self._buf.seek(pos, 0) |
| 223 | return len(data), None |
| 224 | |
| 225 | def send(self, data: bytes) -> Tuple[int, Optional[str]]: |
| 226 | if self.__sock is None: |
no test coverage detected