(out: 'OutputBuffer', s: 'SSH_Socket', server_kex: 'SSH2_Kex', kex_str: str, kex_group: 'KexDH', host_key_types: Dict[str, Dict[str, bool]])
| 105 | |
| 106 | @staticmethod |
| 107 | def perform_test(out: 'OutputBuffer', s: 'SSH_Socket', server_kex: 'SSH2_Kex', kex_str: str, kex_group: 'KexDH', host_key_types: Dict[str, Dict[str, bool]]) -> None: |
| 108 | hostkey_modulus_size = 0 |
| 109 | ca_modulus_size = 0 |
| 110 | parsed_host_key_types = set() |
| 111 | |
| 112 | # If the connection still exists, close it so we can test |
| 113 | # using a clean slate (otherwise it may exist in a non-testable |
| 114 | # state). |
| 115 | if s.is_connected(): |
| 116 | s.close() |
| 117 | |
| 118 | # For each host key type... |
| 119 | for host_key_type in host_key_types: |
| 120 | key_fail_comments = [] |
| 121 | key_warn_comments = [] |
| 122 | |
| 123 | # Skip those already handled (i.e.: those in the RSA family, as testing one tests them all). |
| 124 | if host_key_type in parsed_host_key_types: |
| 125 | continue |
| 126 | |
| 127 | # If this host key type is supported by the server, we test it. |
| 128 | if host_key_type in server_kex.key_algorithms: |
| 129 | out.d('Preparing to obtain ' + host_key_type + ' host key...', write_now=True) |
| 130 | |
| 131 | cert = host_key_types[host_key_type]['cert'] |
| 132 | |
| 133 | # If the connection is closed, re-open it and get the kex again. |
| 134 | if not s.is_connected(): |
| 135 | err = s.connect() |
| 136 | if err is not None: |
| 137 | out.v(err, write_now=True) |
| 138 | return |
| 139 | |
| 140 | _, _, err = s.get_banner() |
| 141 | if err is not None: |
| 142 | out.v(err, write_now=True) |
| 143 | s.close() |
| 144 | return |
| 145 | |
| 146 | # Send our KEX using the specified group-exchange and most of the server's own values. |
| 147 | s.send_kexinit(key_exchanges=[kex_str], hostkeys=[host_key_type], ciphers=server_kex.server.encryption, macs=server_kex.server.mac, compressions=server_kex.server.compression, languages=server_kex.server.languages) |
| 148 | |
| 149 | try: |
| 150 | # Parse the server's KEX. |
| 151 | _, payload = s.read_packet() |
| 152 | SSH2_Kex.parse(out, payload) |
| 153 | except Exception: |
| 154 | msg = "Failed to parse server's kex." |
| 155 | if not out.debug: |
| 156 | msg += " Re-run in debug mode to see stack trace." |
| 157 | |
| 158 | out.v(msg, write_now=True) |
| 159 | out.d("Stack trace:\n%s" % str(traceback.format_exc()), write_now=True) |
| 160 | return |
| 161 | |
| 162 | # Do the initial DH exchange. The server responds back |
| 163 | # with the host key and its length. Bingo. We also get back the host key fingerprint. |
| 164 | kex_group.send_init(s) |
no test coverage detected