MCPcopy
hub / github.com/nonebot/nonebot2 / websocket_echo

Function websocket_echo

tests/fake_server.py:82–133  ·  view source on GitHub ↗
(request: Request)

Source from the content-addressed store, hash-verified

80
81
82def websocket_echo(request: Request) -> Response:
83 stream = request.environ["werkzeug.socket"]
84
85 ws = WSConnection(ConnectionType.SERVER)
86
87 in_data = b"GET %s HTTP/1.1\r\n" % request.path.encode("utf-8")
88 for header, value in request.headers.items():
89 in_data += f"{header}: {value}\r\n".encode()
90 in_data += b"\r\n"
91
92 ws.receive_data(in_data)
93
94 running: bool = True
95 while True:
96 out_data = b""
97
98 for event in ws.events():
99 if isinstance(event, WSRequest):
100 out_data += ws.send(AcceptConnection())
101 elif isinstance(event, CloseConnection):
102 out_data += ws.send(event.response())
103 running = False
104 elif isinstance(event, Ping):
105 out_data += ws.send(event.response())
106 elif isinstance(event, TextMessage):
107 if event.data == "quit":
108 out_data += ws.send(
109 CloseConnection(CloseReason.NORMAL_CLOSURE, "bye")
110 )
111 running = False
112 else:
113 out_data += ws.send(TextMessage(data=event.data))
114 elif isinstance(event, BytesMessage):
115 if event.data == b"quit":
116 out_data += ws.send(
117 CloseConnection(CloseReason.NORMAL_CLOSURE, "bye")
118 )
119 running = False
120 else:
121 out_data += ws.send(BytesMessage(data=event.data))
122
123 if out_data:
124 stream.send(out_data)
125
126 if not running:
127 break
128
129 in_data = stream.recv(4096)
130 ws.receive_data(in_data)
131
132 stream.shutdown(socket.SHUT_RDWR)
133 return Response("", status=204)
134
135
136@Request.application

Callers 1

request_handlerFunction · 0.85

Calls 4

ResponseClass · 0.85
shutdownMethod · 0.80
itemsMethod · 0.45
sendMethod · 0.45

Tested by

no test coverage detected