MCPcopy
hub / github.com/jtesta/ssh-audit / audit

Function audit

src/ssh_audit/ssh_audit.py:1193–1307  ·  view source on GitHub ↗
(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False)

Source from the content-addressed store, hash-verified

1191
1192# Returns one of the exitcodes.* flags.
1193def audit(out: OutputBuffer, aconf: AuditConf, sshv: Optional[int] = None, print_target: bool = False) -> int:
1194 program_retval = exitcodes.GOOD
1195 out.batch = aconf.batch
1196 out.verbose = aconf.verbose
1197 out.debug = aconf.debug
1198 out.level = aconf.level
1199 out.use_colors = aconf.colors
1200 s = SSH_Socket(out, aconf.host, aconf.port, aconf.ip_version_preference, aconf.timeout, aconf.timeout_set)
1201
1202 if aconf.client_audit:
1203 out.v("Listening for client connection on port %d..." % aconf.port, write_now=True)
1204 s.listen_and_accept()
1205 else:
1206 out.v("Starting audit of %s:%d..." % ('[%s]' % aconf.host if Utils.is_ipv6_address(aconf.host) else aconf.host, aconf.port), write_now=True)
1207 err = s.connect()
1208
1209 if err is not None:
1210 out.fail(err)
1211
1212 # If we're running against multiple targets, return a connection error to the calling worker thread. Otherwise, write the error message to the console and exit.
1213 if len(aconf.target_list) > 0:
1214 return exitcodes.CONNECTION_ERROR
1215 else:
1216 out.write()
1217 sys.exit(exitcodes.CONNECTION_ERROR)
1218
1219 if sshv is None:
1220 sshv = 2 if aconf.ssh2 else 1
1221 err = None
1222 banner, header, err = s.get_banner(sshv)
1223 if banner is None:
1224 if err is None:
1225 err = '[exception] did not receive banner.'
1226 else:
1227 err = '[exception] did not receive banner: {}'.format(err)
1228 if err is None:
1229 s.send_kexinit() # Send the algorithms we support (except we don't since this isn't a real SSH connection).
1230
1231 packet_type, payload = s.read_packet(sshv)
1232 if packet_type < 0:
1233 try:
1234 if len(payload) > 0:
1235 payload_txt = payload.decode('utf-8')
1236 else:
1237 payload_txt = 'empty'
1238 except UnicodeDecodeError:
1239 payload_txt = '"{}"'.format(repr(payload).lstrip('b')[1:-1])
1240 if payload_txt == 'Protocol major versions differ.':
1241 if sshv == 2 and aconf.ssh1:
1242 ret = audit(out, aconf, 1)
1243 out.write()
1244 return ret
1245 err = '[exception] error reading packet ({})'.format(payload_txt)
1246 else:
1247 err_pair = None
1248 if sshv == 1 and packet_type != Protocol.SMSG_PUBLIC_KEY:
1249 err_pair = ('SMSG_PUBLIC_KEY', Protocol.SMSG_PUBLIC_KEY)
1250 elif sshv == 2 and packet_type != Protocol.MSG_KEXINIT:

Callers 2

target_worker_threadFunction · 0.85
mainFunction · 0.85

Calls 15

listen_and_acceptMethod · 0.95
connectMethod · 0.95
get_bannerMethod · 0.95
send_kexinitMethod · 0.95
read_packetMethod · 0.95
SSH_SocketClass · 0.90
DHEatClass · 0.90
outputFunction · 0.85
evaluate_policyFunction · 0.85
make_policyFunction · 0.85
vMethod · 0.80

Tested by

no test coverage detected