MCPcopy
hub / github.com/jtesta/ssh-audit / perform_test

Method perform_test

src/ssh_audit/hostkeytest.py:107–263  ·  view source on GitHub ↗
(out: 'OutputBuffer', s: 'SSH_Socket', server_kex: 'SSH2_Kex', kex_str: str, kex_group: 'KexDH', host_key_types: Dict[str, Dict[str, bool]])

Source from the content-addressed store, hash-verified

105
106 @staticmethod
107 def perform_test(out: 'OutputBuffer', s: 'SSH_Socket', server_kex: 'SSH2_Kex', kex_str: str, kex_group: 'KexDH', host_key_types: Dict[str, Dict[str, bool]]) -> None:
108 hostkey_modulus_size = 0
109 ca_modulus_size = 0
110 parsed_host_key_types = set()
111
112 # If the connection still exists, close it so we can test
113 # using a clean slate (otherwise it may exist in a non-testable
114 # state).
115 if s.is_connected():
116 s.close()
117
118 # For each host key type...
119 for host_key_type in host_key_types:
120 key_fail_comments = []
121 key_warn_comments = []
122
123 # Skip those already handled (i.e.: those in the RSA family, as testing one tests them all).
124 if host_key_type in parsed_host_key_types:
125 continue
126
127 # If this host key type is supported by the server, we test it.
128 if host_key_type in server_kex.key_algorithms:
129 out.d('Preparing to obtain ' + host_key_type + ' host key...', write_now=True)
130
131 cert = host_key_types[host_key_type]['cert']
132
133 # If the connection is closed, re-open it and get the kex again.
134 if not s.is_connected():
135 err = s.connect()
136 if err is not None:
137 out.v(err, write_now=True)
138 return
139
140 _, _, err = s.get_banner()
141 if err is not None:
142 out.v(err, write_now=True)
143 s.close()
144 return
145
146 # Send our KEX using the specified group-exchange and most of the server's own values.
147 s.send_kexinit(key_exchanges=[kex_str], hostkeys=[host_key_type], ciphers=server_kex.server.encryption, macs=server_kex.server.mac, compressions=server_kex.server.compression, languages=server_kex.server.languages)
148
149 try:
150 # Parse the server's KEX.
151 _, payload = s.read_packet()
152 SSH2_Kex.parse(out, payload)
153 except Exception:
154 msg = "Failed to parse server's kex."
155 if not out.debug:
156 msg += " Re-run in debug mode to see stack trace."
157
158 out.v(msg, write_now=True)
159 out.d("Stack trace:\n%s" % str(traceback.format_exc()), write_now=True)
160 return
161
162 # Do the initial DH exchange. The server responds back
163 # with the host key and its length. Bingo. We also get back the host key fingerprint.
164 kex_group.send_init(s)

Callers 1

runMethod · 0.80

Calls 15

is_connectedMethod · 0.80
closeMethod · 0.80
dMethod · 0.80
vMethod · 0.80
get_bannerMethod · 0.80
send_kexinitMethod · 0.80
read_packetMethod · 0.80
recv_replyMethod · 0.80
get_hostkey_sizeMethod · 0.80
get_ca_typeMethod · 0.80
get_ca_sizeMethod · 0.80
set_host_keyMethod · 0.80

Tested by

no test coverage detected