(self, message: Union[str, bytes])
| 723 | return cast("WebSocketRoute", self._server) |
| 724 | |
| 725 | def send(self, message: Union[str, bytes]) -> None: |
| 726 | if isinstance(message, str): |
| 727 | _create_task_and_ignore_exception( |
| 728 | self._loop, |
| 729 | self._channel.send( |
| 730 | "sendToPage", None, {"message": message, "isBase64": False} |
| 731 | ), |
| 732 | ) |
| 733 | else: |
| 734 | _create_task_and_ignore_exception( |
| 735 | self._loop, |
| 736 | self._channel.send( |
| 737 | "sendToPage", |
| 738 | None, |
| 739 | { |
| 740 | "message": base64.b64encode(message).decode(), |
| 741 | "isBase64": True, |
| 742 | }, |
| 743 | ), |
| 744 | ) |
| 745 | |
| 746 | def on_message(self, handler: Callable[[Union[str, bytes]], Any]) -> None: |
| 747 | self._on_page_message = handler |
no test coverage detected