()
| 78 | |
| 79 | |
| 80 | async def test_udp(): |
| 81 | r = intercept.Intercept() |
| 82 | with taddons.context(r) as tctx: |
| 83 | tctx.configure(r, intercept="~udp") |
| 84 | f = tflow.tudpflow() |
| 85 | await tctx.cycle(r, f) |
| 86 | assert f.intercepted |
| 87 | |
| 88 | tctx.configure(r, intercept_active=False) |
| 89 | f = tflow.tudpflow() |
| 90 | await tctx.cycle(r, f) |
| 91 | assert not f.intercepted |
| 92 | |
| 93 | |
| 94 | async def test_websocket_message(): |