MCPcopy
hub / github.com/jtesta/ssh-audit / main

Function main

src/ssh_audit/ssh_audit.py:1500–1579  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

1498
1499
1500def main() -> int:
1501 out = OutputBuffer()
1502 aconf = process_commandline(out, sys.argv[1:], usage)
1503
1504 # If we're on Windows, but the colorama module could not be imported, print a warning if we're in verbose mode.
1505 if (sys.platform == 'win32') and ('colorama' not in sys.modules):
1506 out.v("WARNING: colorama module not found. Colorized output will be disabled.", write_now=True)
1507
1508 # If we're outputting JSON, turn off colors and ensure 'info' level messages go through.
1509 if aconf.json:
1510 out.json = True
1511 out.use_colors = False
1512
1513 if aconf.manual:
1514 # If the colorama module was not be imported, turn off colors in order
1515 # to output a plain text version of the man page.
1516 if (sys.platform == 'win32') and ('colorama' not in sys.modules):
1517 out.use_colors = False
1518 retval = builtin_manual(out)
1519 out.write()
1520 sys.exit(retval)
1521
1522 if aconf.lookup != '':
1523 retval = algorithm_lookup(out, aconf.lookup)
1524 out.write()
1525 sys.exit(retval)
1526
1527 # If multiple targets were specified...
1528 if len(aconf.target_list) > 0:
1529 ret = exitcodes.GOOD
1530
1531 # If JSON output is desired, each target's results will be reported in its own list entry.
1532 if aconf.json:
1533 print('[', end='')
1534
1535 # Loop through each target in the list. Entries can specify a port number to use, otherwise the value provided on the command line (--port=N) will be used by default (set to 22 if --port is not used).
1536 target_servers = []
1537 for _, target in enumerate(aconf.target_list):
1538 host, port = Utils.parse_host_and_port(target, default_port=aconf.port)
1539 target_servers.append((host, port))
1540
1541 # A ranked list of return codes. Those with higher indices will take precedence over lower ones. For example, if three servers are scanned, yielding WARNING, GOOD, and UNKNOWN_ERROR, the overall result will be UNKNOWN_ERROR, since its index is the highest. Errors have highest priority, followed by failures, then warnings.
1542 ranked_return_codes = [exitcodes.GOOD, exitcodes.WARNING, exitcodes.FAILURE, exitcodes.CONNECTION_ERROR, exitcodes.UNKNOWN_ERROR]
1543
1544 # Queue all worker threads.
1545 num_target_servers = len(target_servers)
1546 num_processed = 0
1547 out.v("Scanning %u targets with %s%u threads..." % (num_target_servers, '(at most) ' if aconf.threads > num_target_servers else '', aconf.threads), write_now=True)
1548 with concurrent.futures.ThreadPoolExecutor(max_workers=aconf.threads) as executor:
1549 future_to_server = {executor.submit(target_worker_thread, target_server[0], target_server[1], aconf): target_server for target_server in target_servers}
1550 for future in concurrent.futures.as_completed(future_to_server):
1551 worker_ret, worker_output = future.result()
1552
1553 # If this worker's return code is ranked higher that what we've cached so far, update our cache.
1554 if ranked_return_codes.index(worker_ret) > ranked_return_codes.index(ret):
1555 ret = worker_ret
1556
1557 # print("Worker for %s:%d returned %d: [%s]" % (target_server[0], target_server[1], worker_ret, worker_output))

Callers 3

ssh-audit.pyFile · 0.90
__main__.pyFile · 0.90
ssh_audit.pyFile · 0.85

Calls 9

vMethod · 0.95
writeMethod · 0.95
OutputBufferClass · 0.90
process_commandlineFunction · 0.85
builtin_manualFunction · 0.85
algorithm_lookupFunction · 0.85
auditFunction · 0.85
parse_host_and_portMethod · 0.80
thread_exitMethod · 0.45

Tested by

no test coverage detected