(self, event: Event)
| 22 | child_layer: layer.Layer | None = None |
| 23 | |
| 24 | def _handle_event(self, event: Event) -> layer.CommandGenerator[None]: |
| 25 | if isinstance(event, Start): |
| 26 | yield Log(f"Got start. Server state: {self.context.server.state.name}") |
| 27 | elif isinstance(event, DataReceived) and event.data == b"client-hello": |
| 28 | yield SendData(self.context.client, b"client-hello-reply") |
| 29 | elif isinstance(event, DataReceived) and event.data == b"server-hello": |
| 30 | yield SendData(self.context.server, b"server-hello-reply") |
| 31 | elif isinstance(event, DataReceived) and event.data == b"open": |
| 32 | err = yield OpenConnection(self.context.server) |
| 33 | yield Log(f"Opened: {err=}. Server state: {self.context.server.state.name}") |
| 34 | elif isinstance(event, DataReceived) and event.data == b"half-close": |
| 35 | err = yield CloseTcpConnection(event.connection, half_close=True) |
| 36 | elif isinstance(event, ConnectionClosed): |
| 37 | yield Log(f"Got {event.connection.__class__.__name__.lower()} close.") |
| 38 | yield CloseConnection(event.connection) |
| 39 | else: |
| 40 | raise AssertionError |
| 41 | |
| 42 | |
| 43 | class TTunnelLayer(tunnel.TunnelLayer): |
nothing calls this directly
no test coverage detected