This layer establishes TLS for a single server connection.
| 465 | |
| 466 | |
| 467 | class ServerTLSLayer(TLSLayer): |
| 468 | """ |
| 469 | This layer establishes TLS for a single server connection. |
| 470 | """ |
| 471 | |
| 472 | wait_for_clienthello: bool = False |
| 473 | |
| 474 | def __init__(self, context: context.Context, conn: connection.Server | None = None): |
| 475 | super().__init__(context, conn or context.server) |
| 476 | |
| 477 | def start_handshake(self) -> layer.CommandGenerator[None]: |
| 478 | wait_for_clienthello = ( |
| 479 | # if command_to_reply_to is set, we've been instructed to open the connection from the child layer. |
| 480 | # in that case any potential ClientHello is already parsed (by the ClientTLS child layer). |
| 481 | not self.command_to_reply_to |
| 482 | # if command_to_reply_to is not set, the connection was already open when this layer received its Start |
| 483 | # event (eager connection strategy). We now want to establish TLS right away, _unless_ we already know |
| 484 | # that there's TLS on the client side as well (we check if our immediate child layer is set to be ClientTLS) |
| 485 | # In this case want to wait for ClientHello to be parsed, so that we can incorporate SNI/ALPN from there. |
| 486 | and isinstance(self.child_layer, ClientTLSLayer) |
| 487 | ) |
| 488 | if wait_for_clienthello: |
| 489 | self.wait_for_clienthello = True |
| 490 | self.tunnel_state = tunnel.TunnelState.CLOSED |
| 491 | else: |
| 492 | yield from self.start_tls() |
| 493 | if self.tls: |
| 494 | yield from self.receive_handshake_data(b"") |
| 495 | |
| 496 | def event_to_child(self, event: events.Event) -> layer.CommandGenerator[None]: |
| 497 | if self.wait_for_clienthello: |
| 498 | for command in super().event_to_child(event): |
| 499 | if ( |
| 500 | isinstance(command, commands.OpenConnection) |
| 501 | and command.connection == self.conn |
| 502 | ): |
| 503 | self.wait_for_clienthello = False |
| 504 | # swallow OpenConnection here by not re-yielding it. |
| 505 | else: |
| 506 | yield command |
| 507 | else: |
| 508 | yield from super().event_to_child(event) |
| 509 | |
| 510 | def on_handshake_error(self, err: str) -> layer.CommandGenerator[None]: |
| 511 | yield commands.Log(f"Server TLS handshake failed. {err}", level=WARNING) |
| 512 | yield from super().on_handshake_error(err) |
| 513 | |
| 514 | |
| 515 | class ClientTLSLayer(TLSLayer): |
no outgoing calls
no test coverage detected
searching dependent graphs…