MCPcopy
hub / github.com/jtesta/ssh-audit / recv

Method recv

src/ssh_audit/ssh_socket.py:205–223  ·  view source on GitHub ↗
(self, size: int = 2048)

Source from the content-addressed store, hash-verified

203 return self.__banner, self.__header, e
204
205 def recv(self, size: int = 2048) -> Tuple[int, Optional[str]]:
206 if self.__sock is None:
207 return -1, 'not connected'
208 try:
209 data = self.__sock.recv(size)
210 except socket.timeout:
211 return -1, 'timed out'
212 except socket.error as e:
213 if e.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
214 return 0, 'retry'
215 return -1, str(e.args[-1])
216 if len(data) == 0:
217 return -1, None
218 pos = self._buf.tell()
219 self._buf.seek(0, 2)
220 self._buf.write(data)
221 self._len += len(data)
222 self._buf.seek(pos, 0)
223 return len(data), None
224
225 def send(self, data: bytes) -> Tuple[int, Optional[str]]:
226 if self.__sock is None:

Callers 5

get_bannerMethod · 0.95
ensure_readMethod · 0.95
_dh_rate_testMethod · 0.45
read_bannerMethod · 0.45
read_ssh_packetMethod · 0.45

Calls 1

writeMethod · 0.45

Tested by

no test coverage detected