| 80 | # contains the host key, among other things. Function returns the host |
| 81 | # key blob (from which the fingerprint can be calculated). |
| 82 | def recv_reply(self, s: 'SSH_Socket', parse_host_key_size: bool = True) -> Optional[bytes]: |
| 83 | # Reset the CA info, in case it was set from a prior invocation. |
| 84 | self.__hostkey_type = '' |
| 85 | self.__hostkey_e = 0 # pylint: disable=unused-private-member |
| 86 | self.__hostkey_n = 0 # pylint: disable=unused-private-member |
| 87 | self.__hostkey_n_len = 0 |
| 88 | self.__ca_key_type = '' |
| 89 | self.__ca_n_len = 0 |
| 90 | |
| 91 | packet_type, payload = s.read_packet(2) |
| 92 | |
| 93 | # Skip any & all MSG_DEBUG messages. |
| 94 | while packet_type == Protocol.MSG_DEBUG: |
| 95 | packet_type, payload = s.read_packet(2) |
| 96 | |
| 97 | if packet_type != -1 and packet_type not in [Protocol.MSG_KEXDH_REPLY, Protocol.MSG_KEXDH_GEX_REPLY]: # pylint: disable=no-else-raise |
| 98 | raise KexDHException('Expected MSG_KEXDH_REPLY (%d) or MSG_KEXDH_GEX_REPLY (%d), but got %d instead.' % (Protocol.MSG_KEXDH_REPLY, Protocol.MSG_KEXDH_GEX_REPLY, packet_type)) |
| 99 | elif packet_type == -1: |
| 100 | # A connection error occurred. We can't parse anything, so just |
| 101 | # return. The host key modulus (and perhaps certificate modulus) |
| 102 | # will remain at length 0. |
| 103 | self.out.d("KexDH.recv_reply(): received package_type == -1.") |
| 104 | return None |
| 105 | |
| 106 | # Get the host key blob, F, and signature. |
| 107 | ptr = 0 |
| 108 | hostkey, _, ptr = KexDH.__get_bytes(payload, ptr) |
| 109 | |
| 110 | # If we are not supposed to parse the host key size (i.e.: it is a type that is of fixed size such as ed25519), then stop here. |
| 111 | if not parse_host_key_size: |
| 112 | return hostkey |
| 113 | |
| 114 | _, _, ptr = KexDH.__get_bytes(payload, ptr) |
| 115 | _, _, ptr = KexDH.__get_bytes(payload, ptr) |
| 116 | |
| 117 | # Now pick apart the host key blob. |
| 118 | # Get the host key type (i.e.: 'ssh-rsa', 'ssh-ed25519', etc). |
| 119 | ptr = 0 |
| 120 | hostkey_type, _, ptr = KexDH.__get_bytes(hostkey, ptr) |
| 121 | self.__hostkey_type = hostkey_type.decode('ascii') |
| 122 | self.out.d("Parsing host key type: %s" % self.__hostkey_type) |
| 123 | |
| 124 | # If this is an RSA certificate, skip over the nonce. |
| 125 | if self.__hostkey_type.startswith('ssh-rsa-cert-v0'): |
| 126 | self.out.d("RSA certificate found, so skipping nonce.") |
| 127 | _, _, ptr = KexDH.__get_bytes(hostkey, ptr) # Read & skip over the nonce. |
| 128 | |
| 129 | # The public key exponent. |
| 130 | hostkey_e, _, ptr = KexDH.__get_bytes(hostkey, ptr) |
| 131 | self.__hostkey_e = int(binascii.hexlify(hostkey_e), 16) # pylint: disable=unused-private-member |
| 132 | |
| 133 | # ED25519 moduli are fixed at 32 bytes. |
| 134 | if self.__hostkey_type == 'ssh-ed25519': |
| 135 | self.out.d("%s has a fixed host key modulus of 32." % self.__hostkey_type) |
| 136 | self.__hostkey_n_len = 32 |
| 137 | elif self.__hostkey_type == 'ssh-ed448': |
| 138 | self.out.d("%s has a fixed host key modulus of 57." % self.__hostkey_type) |
| 139 | self.__hostkey_n_len = 57 |