(monkeypatch, caplog)
| 427 | |
| 428 | @skip_not_linux |
| 429 | async def test_tun_mode(monkeypatch, caplog): |
| 430 | monkeypatch.setattr(ConnectionHandler, "handle_client", _echo_server) |
| 431 | |
| 432 | with taddons.context(Proxyserver()): |
| 433 | inst = TunInstance.make(f"tun", MagicMock()) |
| 434 | assert inst.tun_name is None |
| 435 | try: |
| 436 | await inst.start() |
| 437 | except RuntimeError as e: |
| 438 | if "Operation not permitted" in str(e): |
| 439 | return pytest.skip("tun mode test must be run as root") |
| 440 | raise |
| 441 | assert inst.tun_name |
| 442 | assert inst.is_running |
| 443 | assert "tun_name" in inst.to_json() |
| 444 | |
| 445 | s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 446 | s.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, inst.tun_name.encode()) |
| 447 | await asyncio.get_running_loop().sock_connect(s, ("192.0.2.1", 1234)) |
| 448 | reader, writer = await asyncio.open_connection(sock=s) |
| 449 | writer.write(b"hello") |
| 450 | await writer.drain() |
| 451 | assert await reader.readexactly(5) == b"HELLO" |
| 452 | writer.close() |
| 453 | await writer.wait_closed() |
| 454 | await inst.stop() |
| 455 | |
| 456 | |
| 457 | async def test_tun_mode_mocked(monkeypatch): |
nothing calls this directly
no test coverage detected
searching dependent graphs…