MCPcopy Index your code
hub / github.com/wavetermdev/waveterm / HandleWsInternal

Function HandleWsInternal

pkg/web/ws.go:253–313  ·  view source on GitHub ↗
(w http.ResponseWriter, r *http.Request)

Source from the content-addressed store, hash-verified

251}
252
253func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
254 stableId := r.URL.Query().Get("stableid")
255 if stableId == "" {
256 return fmt.Errorf("stableid is required")
257 }
258 err := authkey.ValidateIncomingRequest(r)
259 if err != nil {
260 w.WriteHeader(http.StatusUnauthorized)
261 w.Write([]byte(fmt.Sprintf("error validating authkey: %v", err)))
262 log.Printf("[websocket] error validating authkey: %v\n", err)
263 return err
264 }
265 conn, err := WebSocketUpgrader.Upgrade(w, r, nil)
266 if err != nil {
267 return fmt.Errorf("WebSocket Upgrade Failed: %v", err)
268 }
269 defer conn.Close()
270 wsConnId := uuid.New().String()
271 outputCh := make(chan any, WebSocketChannelSize)
272 closeCh := make(chan any)
273 log.Printf("[websocket] new connection: connid:%s stableid:%s\n", wsConnId, stableId)
274 eventbus.RegisterWSChannel(wsConnId, stableId, outputCh)
275 defer eventbus.UnregisterWSChannel(wsConnId)
276 wproxy := wshutil.MakeRpcProxyWithSize(fmt.Sprintf("ws:%s", stableId), WebSocketChannelSize, WebSocketChannelSize)
277 defer close(wproxy.ToRemoteCh)
278 registerConn(wsConnId, stableId, wproxy)
279 defer unregisterConn(wsConnId, stableId)
280 wg := &sync.WaitGroup{}
281 wg.Add(2)
282 go func() {
283 defer func() {
284 panichandler.PanicHandler("HandleWsInternal:outputCh", recover())
285 }()
286 // no waitgroup add here
287 // move values from rpcOutputCh to outputCh
288 for msgBytes := range wproxy.ToRemoteCh {
289 rpcWSMsg := map[string]any{
290 "eventtype": "rpc", // TODO don't hard code this (but def is in eventbus)
291 "data": json.RawMessage(msgBytes),
292 }
293 outputCh <- rpcWSMsg
294 }
295 }()
296 go func() {
297 defer func() {
298 panichandler.PanicHandler("HandleWsInternal:ReadLoop", recover())
299 }()
300 defer wg.Done()
301 ReadLoop(conn, outputCh, closeCh, wproxy.FromRemoteCh, stableId)
302 }()
303 go func() {
304 defer func() {
305 panichandler.PanicHandler("HandleWsInternal:WriteLoop", recover())
306 }()
307 defer wg.Done()
308 WriteLoop(conn, outputCh, closeCh, stableId)
309 }()
310 wg.Wait()

Callers 1

HandleWsFunction · 0.85

Calls 15

ValidateIncomingRequestFunction · 0.92
RegisterWSChannelFunction · 0.92
UnregisterWSChannelFunction · 0.92
MakeRpcProxyWithSizeFunction · 0.92
PanicHandlerFunction · 0.92
registerConnFunction · 0.85
unregisterConnFunction · 0.85
ReadLoopFunction · 0.85
WriteLoopFunction · 0.85
WriteMethod · 0.65
CloseMethod · 0.65
WaitMethod · 0.65

Tested by

no test coverage detected