MCPcopy
hub / github.com/jtesta/ssh-audit / recv_reply

Method recv_reply

src/ssh_audit/kexdh.py:82–151  ·  view source on GitHub ↗
(self, s: 'SSH_Socket', parse_host_key_size: bool = True)

Source from the content-addressed store, hash-verified

80 # contains the host key, among other things. Function returns the host
81 # key blob (from which the fingerprint can be calculated).
82 def recv_reply(self, s: 'SSH_Socket', parse_host_key_size: bool = True) -> Optional[bytes]:
83 # Reset the CA info, in case it was set from a prior invocation.
84 self.__hostkey_type = ''
85 self.__hostkey_e = 0 # pylint: disable=unused-private-member
86 self.__hostkey_n = 0 # pylint: disable=unused-private-member
87 self.__hostkey_n_len = 0
88 self.__ca_key_type = ''
89 self.__ca_n_len = 0
90
91 packet_type, payload = s.read_packet(2)
92
93 # Skip any & all MSG_DEBUG messages.
94 while packet_type == Protocol.MSG_DEBUG:
95 packet_type, payload = s.read_packet(2)
96
97 if packet_type != -1 and packet_type not in [Protocol.MSG_KEXDH_REPLY, Protocol.MSG_KEXDH_GEX_REPLY]: # pylint: disable=no-else-raise
98 raise KexDHException('Expected MSG_KEXDH_REPLY (%d) or MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (Protocol.MSG_KEXDH_REPLY, Protocol.MSG_KEXDH_GEX_REPLY, packet_type))
99 elif packet_type == -1:
100 # A connection error occurred. We can't parse anything, so just
101 # return. The host key modulus (and perhaps certificate modulus)
102 # will remain at length 0.
103 self.out.d("KexDH.recv_reply(): received package_type == -1.")
104 return None
105
106 # Get the host key blob, F, and signature.
107 ptr = 0
108 hostkey, _, ptr = KexDH.__get_bytes(payload, ptr)
109
110 # If we are not supposed to parse the host key size (i.e.: it is a type that is of fixed size such as ed25519), then stop here.
111 if not parse_host_key_size:
112 return hostkey
113
114 _, _, ptr = KexDH.__get_bytes(payload, ptr)
115 _, _, ptr = KexDH.__get_bytes(payload, ptr)
116
117 # Now pick apart the host key blob.
118 # Get the host key type (i.e.: 'ssh-rsa', 'ssh-ed25519', etc).
119 ptr = 0
120 hostkey_type, _, ptr = KexDH.__get_bytes(hostkey, ptr)
121 self.__hostkey_type = hostkey_type.decode('ascii')
122 self.out.d("Parsing host key type: %s" % self.__hostkey_type)
123
124 # If this is an RSA certificate, skip over the nonce.
125 if self.__hostkey_type.startswith('ssh-rsa-cert-v0'):
126 self.out.d("RSA certificate found, so skipping nonce.")
127 _, _, ptr = KexDH.__get_bytes(hostkey, ptr) # Read & skip over the nonce.
128
129 # The public key exponent.
130 hostkey_e, _, ptr = KexDH.__get_bytes(hostkey, ptr)
131 self.__hostkey_e = int(binascii.hexlify(hostkey_e), 16) # pylint: disable=unused-private-member
132
133 # ED25519 moduli are fixed at 32 bytes.
134 if self.__hostkey_type == 'ssh-ed25519':
135 self.out.d("%s has a fixed host key modulus of 32." % self.__hostkey_type)
136 self.__hostkey_n_len = 32
137 elif self.__hostkey_type == 'ssh-ed448':
138 self.out.d("%s has a fixed host key modulus of 57." % self.__hostkey_type)
139 self.__hostkey_n_len = 57

Callers 2

_send_initMethod · 0.80
perform_testMethod · 0.80

Calls 5

__parse_ca_keyMethod · 0.95
KexDHExceptionClass · 0.85
read_packetMethod · 0.80
dMethod · 0.80
__get_bytesMethod · 0.80

Tested by 2

_send_initMethod · 0.64
perform_testMethod · 0.64