Create a RPC server that uses an websocket that connects to a proxy. Parameters ---------- url : str The url to be connected. key : str The key to identify the server.
(url, key="")
| 665 | |
| 666 | |
| 667 | def websocket_proxy_server(url, key=""): |
| 668 | """Create a RPC server that uses an websocket that connects to a proxy. |
| 669 | |
| 670 | Parameters |
| 671 | ---------- |
| 672 | url : str |
| 673 | The url to be connected. |
| 674 | |
| 675 | key : str |
| 676 | The key to identify the server. |
| 677 | """ |
| 678 | |
| 679 | def create_on_message(conn): |
| 680 | def _fsend(data): |
| 681 | data = bytes(data) |
| 682 | conn.write_message(data, binary=True) |
| 683 | return len(data) |
| 684 | |
| 685 | on_message = _ffi_api.CreateEventDrivenServer(_fsend, "WebSocketProxyServer", "%toinit") |
| 686 | return on_message |
| 687 | |
| 688 | @gen.coroutine |
| 689 | def _connect(key): |
| 690 | conn = yield websocket.websocket_connect(url) |
| 691 | on_message = create_on_message(conn) |
| 692 | temp = _server_env(None) |
| 693 | # Start connecton |
| 694 | conn.write_message(struct.pack("<i", base.RPC_MAGIC), binary=True) |
| 695 | key = "server:" + key |
| 696 | conn.write_message(struct.pack("<i", len(key)), binary=True) |
| 697 | conn.write_message(key.encode("utf-8"), binary=True) |
| 698 | msg = yield conn.read_message() |
| 699 | assert len(msg) >= 4 |
| 700 | magic = struct.unpack("<i", msg[:4])[0] |
| 701 | if magic == base.RPC_CODE_DUPLICATE: |
| 702 | raise RuntimeError(f"key: {key} has already been used in proxy") |
| 703 | if magic == base.RPC_CODE_MISMATCH: |
| 704 | logging.info("RPCProxy do not have matching client key %s", key) |
| 705 | elif magic != base.RPC_CODE_SUCCESS: |
| 706 | raise RuntimeError(f"{url} is not RPC Proxy") |
| 707 | msg = msg[4:] |
| 708 | |
| 709 | logging.info("Connection established with remote") |
| 710 | |
| 711 | if msg: |
| 712 | on_message(bytearray(msg), 3) |
| 713 | |
| 714 | while True: |
| 715 | try: |
| 716 | msg = yield conn.read_message() |
| 717 | if msg is None: |
| 718 | break |
| 719 | on_message(bytearray(msg), 3) |
| 720 | except websocket.WebSocketClosedError as err: |
| 721 | break |
| 722 | logging.info("WebSocketProxyServer closed...") |
| 723 | temp.remove() |
| 724 | ioloop.IOLoop.current().stop() |