()
| 92 | |
| 93 | |
| 94 | async def test_websocket_message(): |
| 95 | r = intercept.Intercept() |
| 96 | with taddons.context(r) as tctx: |
| 97 | tctx.configure(r, intercept='~b "hello binary"') |
| 98 | f = tflow.twebsocketflow() |
| 99 | await tctx.cycle(r, f) |
| 100 | assert f.intercepted |
| 101 | |
| 102 | tctx.configure(r, intercept_active=False) |
| 103 | f = tflow.twebsocketflow() |
| 104 | await tctx.cycle(r, f) |
| 105 | assert not f.intercepted |