(self, message: Union[str, bytes])
| 615 | ) |
| 616 | |
| 617 | def send(self, message: Union[str, bytes]) -> None: |
| 618 | if isinstance(message, str): |
| 619 | _create_task_and_ignore_exception( |
| 620 | self._ws._loop, |
| 621 | self._ws._channel.send( |
| 622 | "sendToServer", None, {"message": message, "isBase64": False} |
| 623 | ), |
| 624 | ) |
| 625 | else: |
| 626 | _create_task_and_ignore_exception( |
| 627 | self._ws._loop, |
| 628 | self._ws._channel.send( |
| 629 | "sendToServer", |
| 630 | None, |
| 631 | {"message": base64.b64encode(message).decode(), "isBase64": True}, |
| 632 | ), |
| 633 | ) |
| 634 | |
| 635 | |
| 636 | class WebSocketRoute(ChannelOwner): |
nothing calls this directly
no test coverage detected