()
| 64 | |
| 65 | |
| 66 | async def test_tcp(): |
| 67 | r = intercept.Intercept() |
| 68 | with taddons.context(r) as tctx: |
| 69 | tctx.configure(r, intercept="~tcp") |
| 70 | f = tflow.ttcpflow() |
| 71 | await tctx.cycle(r, f) |
| 72 | assert f.intercepted |
| 73 | |
| 74 | tctx.configure(r, intercept_active=False) |
| 75 | f = tflow.ttcpflow() |
| 76 | await tctx.cycle(r, f) |
| 77 | assert not f.intercepted |
| 78 | |
| 79 | |
| 80 | async def test_udp(): |