(
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
)
| 635 | |
| 636 | class WebSocketRoute(ChannelOwner): |
| 637 | def __init__( |
| 638 | self, parent: ChannelOwner, type: str, guid: str, initializer: Dict |
| 639 | ) -> None: |
| 640 | super().__init__(parent, type, guid, initializer) |
| 641 | self._on_page_message: Optional[Callable[[Union[str, bytes]], Any]] = None |
| 642 | self._on_page_close: Optional[Callable[[Optional[int], Optional[str]], Any]] = ( |
| 643 | None |
| 644 | ) |
| 645 | self._on_server_message: Optional[Callable[[Union[str, bytes]], Any]] = None |
| 646 | self._on_server_close: Optional[ |
| 647 | Callable[[Optional[int], Optional[str]], Any] |
| 648 | ] = None |
| 649 | self._server = ServerWebSocketRoute(self) |
| 650 | self._connected = False |
| 651 | |
| 652 | self._channel.on("messageFromPage", self._channel_message_from_page) |
| 653 | self._channel.on("messageFromServer", self._channel_message_from_server) |
| 654 | self._channel.on("closePage", self._channel_close_page) |
| 655 | self._channel.on("closeServer", self._channel_close_server) |
| 656 | |
| 657 | def _channel_message_from_page(self, event: Dict) -> None: |
| 658 | if self._on_page_message: |
nothing calls this directly
no test coverage detected