()
| 122 | |
| 123 | |
| 124 | def _setup_daemon_for_windows(): |
| 125 | if not is_admin_user(): |
| 126 | raise Fatal('You must be administrator to set the firewall') |
| 127 | |
| 128 | signal.signal(signal.SIGTERM, firewall_exit) |
| 129 | signal.signal(signal.SIGINT, firewall_exit) |
| 130 | |
| 131 | com_chan = os.environ.get('SSHUTTLE_FW_COM_CHANNEL') |
| 132 | if com_chan == 'stdio': |
| 133 | debug3('Using inherited stdio for communicating with sshuttle client process') |
| 134 | else: |
| 135 | debug3('Using shared socket for communicating with sshuttle client process') |
| 136 | socket_share_data = base64.b64decode(com_chan) |
| 137 | sock = socket.fromshare(socket_share_data) # type: socket.socket |
| 138 | sys.stdin = io.TextIOWrapper(sock.makefile('rb', buffering=0)) |
| 139 | sys.stdout = io.TextIOWrapper(sock.makefile('wb', buffering=0), write_through=True) |
| 140 | sock.close() |
| 141 | return sys.stdin.buffer, sys.stdout.buffer |
| 142 | |
| 143 | |
| 144 | # Isolate function that needs to be replaced for tests |
nothing calls this directly
no test coverage detected