(
cls: type[T],
alpn: list[str],
address: Address,
)
| 657 | |
| 658 | @asynccontextmanager |
| 659 | async def quic_connect( |
| 660 | cls: type[T], |
| 661 | alpn: list[str], |
| 662 | address: Address, |
| 663 | ) -> AsyncGenerator[T, None]: |
| 664 | configuration = QuicConfiguration( |
| 665 | is_client=True, |
| 666 | alpn_protocols=alpn, |
| 667 | server_name="example.mitmproxy.org", |
| 668 | verify_mode=ssl.CERT_NONE, |
| 669 | max_datagram_frame_size=65536, |
| 670 | ) |
| 671 | loop = asyncio.get_running_loop() |
| 672 | transport, protocol = await loop.create_datagram_endpoint( |
| 673 | lambda: cls(QuicConnection(configuration=configuration)), |
| 674 | local_addr=("127.0.0.1", 0), |
| 675 | ) |
| 676 | assert isinstance(protocol, cls) |
| 677 | try: |
| 678 | protocol.connect(address) |
| 679 | await protocol.wait_handshake() |
| 680 | yield protocol |
| 681 | finally: |
| 682 | protocol.close() |
| 683 | await protocol.wait_closed() |
| 684 | transport.close() |
| 685 | |
| 686 | |
| 687 | async def _test_echo(client: H3Client, strict: bool) -> None: |
no test coverage detected
searching dependent graphs…