(tctx: Context)
| 223 | |
| 224 | |
| 225 | def test_udp(tctx: Context): |
| 226 | tctx.client.proxy_mode = ProxyMode.parse("reverse:udp://1.2.3.4:5") |
| 227 | flow = Placeholder(UDPFlow) |
| 228 | assert ( |
| 229 | Playbook(modes.ReverseProxy(tctx)) |
| 230 | << OpenConnection(tctx.server) |
| 231 | >> reply(None) |
| 232 | >> DataReceived(tctx.client, b"test-input") |
| 233 | << NextLayerHook(Placeholder(NextLayer)) |
| 234 | >> reply_next_layer(layers.UDPLayer) |
| 235 | << udp.UdpStartHook(flow) |
| 236 | >> reply() |
| 237 | << udp.UdpMessageHook(flow) |
| 238 | >> reply() |
| 239 | << SendData(tctx.server, b"test-input") |
| 240 | ) |
| 241 | assert tctx.server.address == ("1.2.3.4", 5) |
| 242 | assert len(flow().messages) == 1 |
| 243 | assert flow().messages[0].content == b"test-input" |
| 244 | |
| 245 | |
| 246 | @pytest.mark.parametrize("patch", [True, False]) |
nothing calls this directly
no test coverage detected
searching dependent graphs…