queueOut attempts to send a list of ServerComMessages to a session write loop; it fails if the send buffer is full.
(msgs []*ServerComMessage)
| 270 | // queueOut attempts to send a list of ServerComMessages to a session write loop; |
| 271 | // it fails if the send buffer is full. |
| 272 | func (s *Session) queueOutBatch(msgs []*ServerComMessage) bool { |
| 273 | if s == nil { |
| 274 | return true |
| 275 | } |
| 276 | if atomic.LoadInt32(&s.terminating) > 0 { |
| 277 | return true |
| 278 | } |
| 279 | |
| 280 | if s.multi != nil { |
| 281 | // In case of a cluster we need to pass a copy of the actual session. |
| 282 | for i := range msgs { |
| 283 | msgs[i].sess = s |
| 284 | } |
| 285 | if s.multi.queueOutBatch(msgs) { |
| 286 | s.multi.scheduleClusterWriteLoop() |
| 287 | return true |
| 288 | } |
| 289 | return false |
| 290 | } |
| 291 | |
| 292 | if s.supportsMessageBatching() { |
| 293 | select { |
| 294 | case s.send <- msgs: |
| 295 | default: |
| 296 | // Never block here since it may also block the topic's run() goroutine. |
| 297 | logs.Err.Println("s.queueOut: session's send queue2 full", s.sid) |
| 298 | return false |
| 299 | } |
| 300 | if s.isMultiplex() { |
| 301 | s.scheduleClusterWriteLoop() |
| 302 | } |
| 303 | } else { |
| 304 | for _, msg := range msgs { |
| 305 | s.queueOut(msg) |
| 306 | } |
| 307 | } |
| 308 | |
| 309 | return true |
| 310 | } |
| 311 | |
| 312 | // queueOut attempts to send a ServerComMessage to a session write loop; |
| 313 | // it fails, if the send buffer is full. |
no test coverage detected