subscribe subscribes the given WebSocket to all broadcast messages. It creates a subscriber with a buffered msgs chan to give some room to slower connections and then registers the subscriber. It then listens for all messages and writes them to the WebSocket. If the context is cancelled or an error
(w http.ResponseWriter, r *http.Request)
| 112 | // It uses CloseRead to keep reading from the connection to process control |
| 113 | // messages and cancel the context if the connection drops. |
| 114 | func (cs *chatServer) subscribe(w http.ResponseWriter, r *http.Request) error { |
| 115 | var mu sync.Mutex |
| 116 | var c *websocket.Conn |
| 117 | var closed bool |
| 118 | s := &subscriber{ |
| 119 | msgs: make(chan []byte, cs.subscriberMessageBuffer), |
| 120 | closeSlow: func() { |
| 121 | mu.Lock() |
| 122 | defer mu.Unlock() |
| 123 | closed = true |
| 124 | if c != nil { |
| 125 | c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") |
| 126 | } |
| 127 | }, |
| 128 | } |
| 129 | cs.addSubscriber(s) |
| 130 | defer cs.deleteSubscriber(s) |
| 131 | |
| 132 | c2, err := websocket.Accept(w, r, nil) |
| 133 | if err != nil { |
| 134 | return err |
| 135 | } |
| 136 | mu.Lock() |
| 137 | if closed { |
| 138 | mu.Unlock() |
| 139 | return net.ErrClosed |
| 140 | } |
| 141 | c = c2 |
| 142 | mu.Unlock() |
| 143 | defer c.CloseNow() |
| 144 | |
| 145 | ctx := c.CloseRead(context.Background()) |
| 146 | |
| 147 | for { |
| 148 | select { |
| 149 | case msg := <-s.msgs: |
| 150 | err := writeTimeout(ctx, time.Second*5, c, msg) |
| 151 | if err != nil { |
| 152 | return err |
| 153 | } |
| 154 | case <-ctx.Done(): |
| 155 | return ctx.Err() |
| 156 | } |
| 157 | } |
| 158 | } |
| 159 | |
| 160 | // publish publishes the msg to all subscribers. |
| 161 | // It never blocks and so messages to slow subscribers |
no test coverage detected