MCPcopy
hub / github.com/SurgeDM/Surge / eventsHandler

Function eventsHandler

cmd/http_api.go:294–342  ·  view source on GitHub ↗
(service core.DownloadService)

Source from the content-addressed store, hash-verified

292}
293
294func eventsHandler(service core.DownloadService) http.HandlerFunc {
295 return func(w http.ResponseWriter, r *http.Request) {
296 w.Header().Set("Content-Type", "text/event-stream")
297 w.Header().Set("Cache-Control", "no-cache")
298 w.Header().Set("Connection", "keep-alive")
299 w.Header().Set("Access-Control-Allow-Origin", "*")
300
301 stream, cleanup, err := service.StreamEvents(r.Context())
302 if err != nil {
303 http.Error(w, "Failed to subscribe to events", http.StatusInternalServerError)
304 return
305 }
306 defer cleanup()
307
308 flusher, ok := w.(http.Flusher)
309 if !ok {
310 http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
311 return
312 }
313 flusher.Flush()
314
315 done := r.Context().Done()
316 for {
317 select {
318 case <-done:
319 return
320 case msg, ok := <-stream:
321 if !ok {
322 return
323 }
324
325 frames, err := events.EncodeSSEMessages(msg)
326 if err != nil {
327 utils.Debug("Error encoding SSE event: %v", err)
328 continue
329 }
330 if len(frames) == 0 {
331 continue
332 }
333
334 for _, frame := range frames {
335 _, _ = fmt.Fprintf(w, "event: %s\n", frame.Event)
336 _, _ = fmt.Fprintf(w, "data: %s\n\n", frame.Data)
337 }
338 flusher.Flush()
339 }
340 }
341 }
342}
343
344func requireMethod(method string, next http.HandlerFunc) http.HandlerFunc {
345 return requireMethods(next, method)

Callers 1

registerHTTPRoutesFunction · 0.85

Calls 5

EncodeSSEMessagesFunction · 0.92
DebugFunction · 0.92
ErrorMethod · 0.80
FlushMethod · 0.80
StreamEventsMethod · 0.65

Tested by

no test coverage detected