(http_protocol_cls: type[HTTPProtocol])
| 735 | |
| 736 | |
| 737 | async def test_max_concurrency(http_protocol_cls: type[HTTPProtocol]): |
| 738 | app = Response("Hello, world", media_type="text/plain") |
| 739 | |
| 740 | protocol = get_connected_protocol(app, http_protocol_cls, limit_concurrency=1) |
| 741 | protocol.data_received(SIMPLE_GET_REQUEST) |
| 742 | await protocol.loop.run_one() |
| 743 | assert ( |
| 744 | b"\r\n".join( |
| 745 | [ |
| 746 | b"HTTP/1.1 503 Service Unavailable", |
| 747 | b"content-type: text/plain; charset=utf-8", |
| 748 | b"content-length: 19", |
| 749 | b"connection: close", |
| 750 | b"", |
| 751 | b"Service Unavailable", |
| 752 | ] |
| 753 | ) |
| 754 | == protocol.transport.buffer |
| 755 | ) |
| 756 | |
| 757 | |
| 758 | async def test_shutdown_during_request(http_protocol_cls: type[HTTPProtocol]): |
nothing calls this directly
no test coverage detected