MCPcopy Index your code
hub / github.com/microsoft/playwright-python / ServerWebSocketRoute

Class ServerWebSocketRoute

playwright/_impl/_network.py:580–633  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

578
579
580class ServerWebSocketRoute:
581 def __init__(self, ws: "WebSocketRoute"):
582 self._ws = ws
583
584 def on_message(self, handler: Callable[[Union[str, bytes]], Any]) -> None:
585 self._ws._on_server_message = handler
586
587 def on_close(self, handler: Callable[[Optional[int], Optional[str]], Any]) -> None:
588 self._ws._on_server_close = handler
589
590 def connect_to_server(self) -> None:
591 raise NotImplementedError(
592 "connectToServer must be called on the page-side WebSocketRoute"
593 )
594
595 @property
596 def url(self) -> str:
597 return self._ws._initializer["url"]
598
599 @property
600 def protocols(self) -> List[str]:
601 return list(self._ws._initializer.get("protocols", []))
602
603 def close(self, code: int = None, reason: str = None) -> None:
604 _create_task_and_ignore_exception(
605 self._ws._loop,
606 self._ws._channel.send(
607 "closeServer",
608 None,
609 {
610 "code": code,
611 "reason": reason,
612 "wasClean": True,
613 },
614 ),
615 )
616
617 def send(self, message: Union[str, bytes]) -> None:
618 if isinstance(message, str):
619 _create_task_and_ignore_exception(
620 self._ws._loop,
621 self._ws._channel.send(
622 "sendToServer", None, {"message": message, "isBase64": False}
623 ),
624 )
625 else:
626 _create_task_and_ignore_exception(
627 self._ws._loop,
628 self._ws._channel.send(
629 "sendToServer",
630 None,
631 {"message": base64.b64encode(message).decode(), "isBase64": True},
632 ),
633 )
634
635
636class WebSocketRoute(ChannelOwner):

Callers 1

__init__Method · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected