| 47 | |
| 48 | |
| 49 | class SSH_Socket(ReadBuf, WriteBuf): |
| 50 | class InsufficientReadException(Exception): |
| 51 | pass |
| 52 | |
| 53 | SM_BANNER_SENT = 1 |
| 54 | |
| 55 | def __init__(self, outputbuffer: 'OutputBuffer', host: Optional[str], port: int, ip_version_preference: List[int] = [], timeout: Union[int, float] = 5, timeout_set: bool = False) -> None: # pylint: disable=dangerous-default-value |
| 56 | super(SSH_Socket, self).__init__() |
| 57 | self.__outputbuffer = outputbuffer |
| 58 | self.__sock: Optional[socket.socket] = None |
| 59 | self.__sock_map: Dict[int, socket.socket] = {} |
| 60 | self.__block_size = 8 |
| 61 | self.__state = 0 |
| 62 | self.__header: List[str] = [] |
| 63 | self.__banner: Optional[Banner] = None |
| 64 | if host is None: |
| 65 | raise ValueError('undefined host') |
| 66 | nport = Utils.parse_int(port) |
| 67 | if nport < 1 or nport > 65535: |
| 68 | raise ValueError('invalid port: {}'.format(port)) |
| 69 | self.__host = host |
| 70 | self.__port = nport |
| 71 | self.__ip_version_preference = ip_version_preference # Holds only 5 possible values: [] (no preference), [4] (use IPv4 only), [6] (use IPv6 only), [46] (use both IPv4 and IPv6, but prioritize v4), and [64] (use both IPv4 and IPv6, but prioritize v6). |
| 72 | self.__timeout = timeout |
| 73 | self.__timeout_set = timeout_set |
| 74 | self.client_host: Optional[str] = None |
| 75 | self.client_port = None |
| 76 | |
| 77 | def _resolve(self) -> Iterable[Tuple[int, Tuple[Any, ...]]]: |
| 78 | """Resolves a hostname into a list of IPs |
| 79 | Raises |
| 80 | ------ |
| 81 | socket.gaierror [Errno -2] |
| 82 | If the hostname cannot be resolved. |
| 83 | """ |
| 84 | # If __ip_version_preference has only one entry, then it means that ONLY that IP version should be used. |
| 85 | if len(self.__ip_version_preference) == 1: |
| 86 | family = socket.AF_INET if self.__ip_version_preference[0] == 4 else socket.AF_INET6 |
| 87 | else: |
| 88 | family = socket.AF_UNSPEC |
| 89 | stype = socket.SOCK_STREAM |
| 90 | r = socket.getaddrinfo(self.__host, self.__port, family, stype) |
| 91 | |
| 92 | # If the user has a preference for using IPv4 over IPv6 (or vice-versa), then sort the list returned by getaddrinfo() so that the preferred address type comes first. |
| 93 | if len(self.__ip_version_preference) == 2: |
| 94 | r = sorted(r, key=lambda x: x[0], reverse=(self.__ip_version_preference[0] == 6)) # pylint: disable=superfluous-parens |
| 95 | for af, socktype, _proto, _canonname, addr in r: |
| 96 | if socktype == socket.SOCK_STREAM: |
| 97 | yield af, addr |
| 98 | |
| 99 | # Listens on a server socket and accepts one connection (used for |
| 100 | # auditing client connections). |
| 101 | def listen_and_accept(self) -> None: |
| 102 | |
| 103 | try: |
| 104 | # Socket to listen on all IPv4 addresses. |
| 105 | s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 106 | s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
no outgoing calls
no test coverage detected