(w http.ResponseWriter, r *http.Request)
| 454 | } |
| 455 | |
| 456 | func (h *httpHandlers) handleSSE(w http.ResponseWriter, r *http.Request) { |
| 457 | defer func() { |
| 458 | panicErr := util.PanicHandler("handleSSE", recover()) |
| 459 | if panicErr != nil { |
| 460 | http.Error(w, fmt.Sprintf("internal server error: %v", panicErr), http.StatusInternalServerError) |
| 461 | } |
| 462 | }() |
| 463 | |
| 464 | if r.Method != http.MethodGet { |
| 465 | http.Error(w, "method not allowed", http.StatusMethodNotAllowed) |
| 466 | return |
| 467 | } |
| 468 | |
| 469 | clientId := r.URL.Query().Get("clientId") |
| 470 | if err := h.Client.checkClientId(clientId); err != nil { |
| 471 | http.Error(w, fmt.Sprintf("client id error: %v", err), http.StatusBadRequest) |
| 472 | return |
| 473 | } |
| 474 | |
| 475 | // Generate unique connection ID for this SSE connection |
| 476 | connectionId := fmt.Sprintf("%s-%d", clientId, time.Now().UnixNano()) |
| 477 | |
| 478 | // Register SSE channel for this connection |
| 479 | eventCh := h.Client.RegisterSSEChannel(connectionId) |
| 480 | defer h.Client.UnregisterSSEChannel(connectionId) |
| 481 | |
| 482 | // Set SSE headers |
| 483 | setNoCacheHeaders(w) |
| 484 | w.Header().Set("Content-Type", "text/event-stream") |
| 485 | w.Header().Set("Connection", "keep-alive") |
| 486 | w.Header().Set("X-Accel-Buffering", "no") |
| 487 | w.Header().Set("X-Content-Type-Options", "nosniff") |
| 488 | |
| 489 | // Use ResponseController for better flushing control |
| 490 | rc := http.NewResponseController(w) |
| 491 | if err := rc.Flush(); err != nil { |
| 492 | http.Error(w, "streaming not supported", http.StatusInternalServerError) |
| 493 | return |
| 494 | } |
| 495 | |
| 496 | // Create a ticker for keepalive packets |
| 497 | keepaliveTicker := time.NewTicker(SSEKeepAliveDuration) |
| 498 | defer keepaliveTicker.Stop() |
| 499 | |
| 500 | for { |
| 501 | select { |
| 502 | case <-r.Context().Done(): |
| 503 | return |
| 504 | case <-keepaliveTicker.C: |
| 505 | // Send keepalive comment |
| 506 | fmt.Fprintf(w, ": keepalive\n\n") |
| 507 | rc.Flush() |
| 508 | case event := <-eventCh: |
| 509 | if event.Event == "" { |
| 510 | break |
| 511 | } |
| 512 | fmt.Fprintf(w, "event: %s\n", event.Event) |
| 513 | fmt.Fprintf(w, "data: %s\n", string(event.Data)) |
nothing calls this directly
no test coverage detected