(
protocol: BaseProtocol,
out_low_limit: WebSocketDataQueue,
parser_low_limit: PatchableWebSocketReader,
)
| 729 | |
| 730 | |
| 731 | def test_flow_control_binary( |
| 732 | protocol: BaseProtocol, |
| 733 | out_low_limit: WebSocketDataQueue, |
| 734 | parser_low_limit: PatchableWebSocketReader, |
| 735 | ) -> None: |
| 736 | large_payload = b"b" * (1 + 16 * 2) |
| 737 | large_payload_size = len(large_payload) |
| 738 | parser_low_limit._handle_frame(True, WSMsgType.BINARY, large_payload, 0) |
| 739 | res = out_low_limit._buffer[0] |
| 740 | assert res == (WSMessage(WSMsgType.BINARY, large_payload, ""), large_payload_size) |
| 741 | assert protocol._reading_paused is True |
| 742 | |
| 743 | |
| 744 | def test_flow_control_multi_byte_text( |
nothing calls this directly
no test coverage detected
searching dependent graphs…