MCPcopy
hub / github.com/aio-libs/aiohttp / test_flow_control_binary

Function test_flow_control_binary

tests/test_websocket_parser.py:731–741  ·  view source on GitHub ↗
(
    protocol: BaseProtocol,
    out_low_limit: WebSocketDataQueue,
    parser_low_limit: PatchableWebSocketReader,
)

Source from the content-addressed store, hash-verified

729
730
731def test_flow_control_binary(
732 protocol: BaseProtocol,
733 out_low_limit: WebSocketDataQueue,
734 parser_low_limit: PatchableWebSocketReader,
735) -> None:
736 large_payload = b"b" * (1 + 16 * 2)
737 large_payload_size = len(large_payload)
738 parser_low_limit._handle_frame(True, WSMsgType.BINARY, large_payload, 0)
739 res = out_low_limit._buffer[0]
740 assert res == (WSMessage(WSMsgType.BINARY, large_payload, ""), large_payload_size)
741 assert protocol._reading_paused is True
742
743
744def test_flow_control_multi_byte_text(

Callers

nothing calls this directly

Calls 2

WSMessageClass · 0.90
_handle_frameMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…