(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False)
| 1191 | |
| 1192 | # Returns one of the exitcodes.* flags. |
| 1193 | def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int: |
| 1194 | program_retval = exitcodes.GOOD |
| 1195 | out.batch = aconf.batch |
| 1196 | out.verbose = aconf.verbose |
| 1197 | out.debug = aconf.debug |
| 1198 | out.level = aconf.level |
| 1199 | out.use_colors = aconf.colors |
| 1200 | s = SSH_Socket(out, aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set) |
| 1201 | |
| 1202 | if aconf.client_audit: |
| 1203 | out.v("Listening for client connection on port %d..." % aconf.port, write_now=True) |
| 1204 | s.listen_and_accept() |
| 1205 | else: |
| 1206 | out.v("Starting audit of %s:%d..." % ('[%s]' % aconf.host if Utils.is_ipv6_address(aconf.host) else aconf.host, aconf.port), write_now=True) |
| 1207 | err = s.connect() |
| 1208 | |
| 1209 | if err is not None: |
| 1210 | out.fail(err) |
| 1211 | |
| 1212 | # If we're running against multiple targets, return a connection error to the calling worker thread. Otherwise, write the error message to the console and exit. |
| 1213 | if len(aconf.target_list) > 0: |
| 1214 | return exitcodes.CONNECTION_ERROR |
| 1215 | else: |
| 1216 | out.write() |
| 1217 | sys.exit(exitcodes.CONNECTION_ERROR) |
| 1218 | |
| 1219 | if sshv is None: |
| 1220 | sshv = 2 if aconf.ssh2 else 1 |
| 1221 | err = None |
| 1222 | banner, header, err = s.get_banner(sshv) |
| 1223 | if banner is None: |
| 1224 | if err is None: |
| 1225 | err = '[exception] did not receive banner.' |
| 1226 | else: |
| 1227 | err = '[exception] did not receive banner: {}'.format(err) |
| 1228 | if err is None: |
| 1229 | s.send_kexinit() # Send the algorithms we support (except we don't since this isn't a real SSH connection). |
| 1230 | |
| 1231 | packet_type, payload = s.read_packet(sshv) |
| 1232 | if packet_type < 0: |
| 1233 | try: |
| 1234 | if len(payload) > 0: |
| 1235 | payload_txt = payload.decode('utf-8') |
| 1236 | else: |
| 1237 | payload_txt = 'empty' |
| 1238 | except UnicodeDecodeError: |
| 1239 | payload_txt = '"{}"'.format(repr(payload).lstrip('b')[1:-1]) |
| 1240 | if payload_txt == 'Protocol major versions differ.': |
| 1241 | if sshv == 2 and aconf.ssh1: |
| 1242 | ret = audit(out, aconf, 1) |
| 1243 | out.write() |
| 1244 | return ret |
| 1245 | err = '[exception] error reading packet ({})'.format(payload_txt) |
| 1246 | else: |
| 1247 | err_pair = None |
| 1248 | if sshv == 1 and packet_type != Protocol.SMSG_PUBLIC_KEY: |
| 1249 | err_pair = ('SMSG_PUBLIC_KEY', Protocol.SMSG_PUBLIC_KEY) |
| 1250 | elif sshv == 2 and packet_type != Protocol.MSG_KEXINIT: |
no test coverage detected