(
create_protocol, alpn: list[str]
)
| 471 | |
| 472 | @asynccontextmanager |
| 473 | async def quic_server( |
| 474 | create_protocol, alpn: list[str] |
| 475 | ) -> AsyncGenerator[Address, None]: |
| 476 | configuration = QuicConfiguration( |
| 477 | is_client=False, |
| 478 | alpn_protocols=alpn, |
| 479 | max_datagram_frame_size=65536, |
| 480 | ) |
| 481 | configuration.load_cert_chain( |
| 482 | certfile=tlsdata.path("../net/data/verificationcerts/trusted-leaf.crt"), |
| 483 | keyfile=tlsdata.path("../net/data/verificationcerts/trusted-leaf.key"), |
| 484 | ) |
| 485 | loop = asyncio.get_running_loop() |
| 486 | transport, server = await loop.create_datagram_endpoint( |
| 487 | lambda: QuicServer( |
| 488 | configuration=configuration, |
| 489 | create_protocol=create_protocol, |
| 490 | ), |
| 491 | local_addr=("127.0.0.1", 0), |
| 492 | ) |
| 493 | try: |
| 494 | yield transport.get_extra_info("sockname") |
| 495 | finally: |
| 496 | server.close() |
| 497 | |
| 498 | |
| 499 | class QuicClient(QuicConnectionProtocol): |
no test coverage detected
searching dependent graphs…