Establish QUIC between client and proxy.
(self, tls_start: quic.QuicTlsData)
| 377 | tls_start.ssl_conn.set_connect_state() |
| 378 | |
| 379 | def quic_start_client(self, tls_start: quic.QuicTlsData) -> None: |
| 380 | """Establish QUIC between client and proxy.""" |
| 381 | if tls_start.settings is not None: |
| 382 | return # a user addon has already provided the settings. |
| 383 | tls_start.settings = quic.QuicTlsSettings() |
| 384 | |
| 385 | # keep the following part in sync with `tls_start_client` |
| 386 | assert isinstance(tls_start.conn, connection.Client) |
| 387 | |
| 388 | client: connection.Client = tls_start.conn |
| 389 | server: connection.Server = tls_start.context.server |
| 390 | |
| 391 | entry = self.get_cert(tls_start.context) |
| 392 | |
| 393 | if not client.cipher_list and ctx.options.ciphers_client: |
| 394 | client.cipher_list = ctx.options.ciphers_client.split(":") |
| 395 | |
| 396 | if ctx.options.add_upstream_certs_to_client_chain: # pragma: no cover |
| 397 | extra_chain_certs = server.certificate_list |
| 398 | else: |
| 399 | extra_chain_certs = [] |
| 400 | |
| 401 | # set context parameters |
| 402 | if client.cipher_list: |
| 403 | tls_start.settings.cipher_suites = [ |
| 404 | CipherSuite[cipher] for cipher in client.cipher_list |
| 405 | ] |
| 406 | # if we don't have upstream ALPN, we allow all offered by the client |
| 407 | tls_start.settings.alpn_protocols = [ |
| 408 | alpn.decode("ascii") |
| 409 | for alpn in [alpn for alpn in (client.alpn, server.alpn) if alpn] |
| 410 | or client.alpn_offers |
| 411 | ] |
| 412 | |
| 413 | # set the certificates |
| 414 | tls_start.settings.certificate = entry.cert._cert |
| 415 | tls_start.settings.certificate_private_key = entry.privatekey |
| 416 | tls_start.settings.certificate_chain = [ |
| 417 | cert._cert for cert in (*entry.chain_certs, *extra_chain_certs) |
| 418 | ] |
| 419 | |
| 420 | def quic_start_server(self, tls_start: quic.QuicTlsData) -> None: |
| 421 | """Establish QUIC between proxy and server.""" |