MCPcopy
hub / github.com/jtesta/ssh-audit / SSH_Socket

Class SSH_Socket

src/ssh_audit/ssh_socket.py:49–344  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

47
48
49class SSH_Socket(ReadBuf, WriteBuf):
50 class InsufficientReadException(Exception):
51 pass
52
53 SM_BANNER_SENT = 1
54
55 def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False) -> None: # pylint: disable=dangerous-default-value
56 super(SSH_Socket, self).__init__()
57 self.__outputbuffer = outputbuffer
58 self.__sock: Optional[socket.socket] = None
59 self.__sock_map: Dict[int, socket.socket] = {}
60 self.__block_size = 8
61 self.__state = 0
62 self.__header: List[str] = []
63 self.__banner: Optional[Banner] = None
64 if host is None:
65 raise ValueError('undefined host')
66 nport = Utils.parse_int(port)
67 if nport < 1 or nport > 65535:
68 raise ValueError('invalid port: {}'.format(port))
69 self.__host = host
70 self.__port = nport
71 self.__ip_version_preference = ip_version_preference # Holds only 5 possible values: [] (no preference), [4] (use IPv4 only), [6] (use IPv6 only), [46] (use both IPv4 and IPv6, but prioritize v4), and [64] (use both IPv4 and IPv6, but prioritize v6).
72 self.__timeout = timeout
73 self.__timeout_set = timeout_set
74 self.client_host: Optional[str] = None
75 self.client_port = None
76
77 def _resolve(self) -> Iterable[Tuple[int, Tuple[Any, ...]]]:
78 """Resolves a hostname into a list of IPs
79 Raises
80 ------
81 socket.gaierror [Errno -2]
82 If the hostname cannot be resolved.
83 """
84 # If __ip_version_preference has only one entry, then it means that ONLY that IP version should be used.
85 if len(self.__ip_version_preference) == 1:
86 family = socket.AF_INET if self.__ip_version_preference[0] == 4 else socket.AF_INET6
87 else:
88 family = socket.AF_UNSPEC
89 stype = socket.SOCK_STREAM
90 r = socket.getaddrinfo(self.__host, self.__port, family, stype)
91
92 # If the user has a preference for using IPv4 over IPv6 (or vice-versa), then sort the list returned by getaddrinfo() so that the preferred address type comes first.
93 if len(self.__ip_version_preference) == 2:
94 r = sorted(r, key=lambda x: x[0], reverse=(self.__ip_version_preference[0] == 6)) # pylint: disable=superfluous-parens
95 for af, socktype, _proto, _canonname, addr in r:
96 if socktype == socket.SOCK_STREAM:
97 yield af, addr
98
99 # Listens on a server socket and accepts one connection (used for
100 # auditing client connections).
101 def listen_and_accept(self) -> None:
102
103 try:
104 # Socket to listen on all IPv4 addresses.
105 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
106 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

Callers 2

auditFunction · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected