()
| 126 | |
| 127 | |
| 128 | async def test_inject() -> None: |
| 129 | async def server_handler( |
| 130 | reader: asyncio.StreamReader, writer: asyncio.StreamWriter |
| 131 | ): |
| 132 | while s := await reader.read(1): |
| 133 | writer.write(s.upper()) |
| 134 | |
| 135 | ps = Proxyserver() |
| 136 | nl = NextLayer() |
| 137 | state = HelperAddon() |
| 138 | |
| 139 | with taddons.context(ps, nl, state) as tctx: |
| 140 | async with tcp_server(server_handler) as addr: |
| 141 | tctx.configure(ps, listen_host="127.0.0.1", listen_port=0) |
| 142 | assert await ps.setup_servers() |
| 143 | ps.running() |
| 144 | proxy_addr = ps.servers["regular"].listen_addrs[0] |
| 145 | reader, writer = await asyncio.open_connection(*proxy_addr) |
| 146 | |
| 147 | req = f"CONNECT {addr[0]}:{addr[1]} HTTP/1.1\r\n\r\n" |
| 148 | writer.write(req.encode()) |
| 149 | assert ( |
| 150 | await reader.readuntil(b"\r\n\r\n") |
| 151 | == b"HTTP/1.1 200 Connection established\r\n\r\n" |
| 152 | ) |
| 153 | |
| 154 | writer.write(b"a") |
| 155 | assert await reader.read(1) == b"A" |
| 156 | ps.inject_tcp(state.flows[0], False, b"b") |
| 157 | assert await reader.read(1) == b"B" |
| 158 | ps.inject_tcp(state.flows[0], True, b"c") |
| 159 | assert await reader.read(1) == b"c" |
| 160 | |
| 161 | writer.close() |
| 162 | await writer.wait_closed() |
| 163 | await _wait_for_connection_closes(ps) |
| 164 | |
| 165 | |
| 166 | async def test_inject_fail(caplog) -> None: |
nothing calls this directly
no test coverage detected
searching dependent graphs…