(w http.ResponseWriter, r *http.Request)
| 251 | } |
| 252 | |
| 253 | func HandleWsInternal(w http.ResponseWriter, r *http.Request) error { |
| 254 | stableId := r.URL.Query().Get("stableid") |
| 255 | if stableId == "" { |
| 256 | return fmt.Errorf("stableid is required") |
| 257 | } |
| 258 | err := authkey.ValidateIncomingRequest(r) |
| 259 | if err != nil { |
| 260 | w.WriteHeader(http.StatusUnauthorized) |
| 261 | w.Write([]byte(fmt.Sprintf("error validating authkey: %v", err))) |
| 262 | log.Printf("[websocket] error validating authkey: %v\n", err) |
| 263 | return err |
| 264 | } |
| 265 | conn, err := WebSocketUpgrader.Upgrade(w, r, nil) |
| 266 | if err != nil { |
| 267 | return fmt.Errorf("WebSocket Upgrade Failed: %v", err) |
| 268 | } |
| 269 | defer conn.Close() |
| 270 | wsConnId := uuid.New().String() |
| 271 | outputCh := make(chan any, WebSocketChannelSize) |
| 272 | closeCh := make(chan any) |
| 273 | log.Printf("[websocket] new connection: connid:%s stableid:%s\n", wsConnId, stableId) |
| 274 | eventbus.RegisterWSChannel(wsConnId, stableId, outputCh) |
| 275 | defer eventbus.UnregisterWSChannel(wsConnId) |
| 276 | wproxy := wshutil.MakeRpcProxyWithSize(fmt.Sprintf("ws:%s", stableId), WebSocketChannelSize, WebSocketChannelSize) |
| 277 | defer close(wproxy.ToRemoteCh) |
| 278 | registerConn(wsConnId, stableId, wproxy) |
| 279 | defer unregisterConn(wsConnId, stableId) |
| 280 | wg := &sync.WaitGroup{} |
| 281 | wg.Add(2) |
| 282 | go func() { |
| 283 | defer func() { |
| 284 | panichandler.PanicHandler("HandleWsInternal:outputCh", recover()) |
| 285 | }() |
| 286 | // no waitgroup add here |
| 287 | // move values from rpcOutputCh to outputCh |
| 288 | for msgBytes := range wproxy.ToRemoteCh { |
| 289 | rpcWSMsg := map[string]any{ |
| 290 | "eventtype": "rpc", // TODO don't hard code this (but def is in eventbus) |
| 291 | "data": json.RawMessage(msgBytes), |
| 292 | } |
| 293 | outputCh <- rpcWSMsg |
| 294 | } |
| 295 | }() |
| 296 | go func() { |
| 297 | defer func() { |
| 298 | panichandler.PanicHandler("HandleWsInternal:ReadLoop", recover()) |
| 299 | }() |
| 300 | defer wg.Done() |
| 301 | ReadLoop(conn, outputCh, closeCh, wproxy.FromRemoteCh, stableId) |
| 302 | }() |
| 303 | go func() { |
| 304 | defer func() { |
| 305 | panichandler.PanicHandler("HandleWsInternal:WriteLoop", recover()) |
| 306 | }() |
| 307 | defer wg.Done() |
| 308 | WriteLoop(conn, outputCh, closeCh, stableId) |
| 309 | }() |
| 310 | wg.Wait() |
no test coverage detected