MCPcopy
hub / github.com/encode/uvicorn / test_max_concurrency

Function test_max_concurrency

tests/protocols/test_http.py:737–755  ·  view source on GitHub ↗
(http_protocol_cls: type[HTTPProtocol])

Source from the content-addressed store, hash-verified

735
736
737async def test_max_concurrency(http_protocol_cls: type[HTTPProtocol]):
738 app = Response("Hello, world", media_type="text/plain")
739
740 protocol = get_connected_protocol(app, http_protocol_cls, limit_concurrency=1)
741 protocol.data_received(SIMPLE_GET_REQUEST)
742 await protocol.loop.run_one()
743 assert (
744 b"\r\n".join(
745 [
746 b"HTTP/1.1 503 Service Unavailable",
747 b"content-type: text/plain; charset=utf-8",
748 b"content-length: 19",
749 b"connection: close",
750 b"",
751 b"Service Unavailable",
752 ]
753 )
754 == protocol.transport.buffer
755 )
756
757
758async def test_shutdown_during_request(http_protocol_cls: type[HTTPProtocol]):

Callers

nothing calls this directly

Calls 5

ResponseClass · 0.90
joinMethod · 0.80
get_connected_protocolFunction · 0.70
data_receivedMethod · 0.45
run_oneMethod · 0.45

Tested by

no test coverage detected