BatchAndSendMessages sends messages in batches.
()
| 336 | |
| 337 | // BatchAndSendMessages sends messages in batches. |
| 338 | func (n *Node) BatchAndSendMessages() { |
| 339 | batches := make(map[uint64]*bytes.Buffer) |
| 340 | streams := make(map[uint64]*stream) |
| 341 | |
| 342 | for { |
| 343 | totalSize := 0 |
| 344 | sm := <-n.messages |
| 345 | slurp_loop: |
| 346 | for { |
| 347 | var buf *bytes.Buffer |
| 348 | if b, ok := batches[sm.to]; !ok { |
| 349 | buf = new(bytes.Buffer) |
| 350 | batches[sm.to] = buf |
| 351 | } else { |
| 352 | buf = b |
| 353 | } |
| 354 | totalSize += 4 + len(sm.data) |
| 355 | x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data)))) |
| 356 | x.Check2(buf.Write(sm.data)) |
| 357 | |
| 358 | if totalSize > messageBatchSoftLimit { |
| 359 | // We limit the batch size, but we aren't pushing back on |
| 360 | // n.messages, because the loop below spawns a goroutine |
| 361 | // to do its dirty work. This is good because right now |
| 362 | // (*node).send fails(!) if the channel is full. |
| 363 | break |
| 364 | } |
| 365 | |
| 366 | select { |
| 367 | case sm = <-n.messages: |
| 368 | default: |
| 369 | break slurp_loop |
| 370 | } |
| 371 | } |
| 372 | |
| 373 | for to, buf := range batches { |
| 374 | if buf.Len() == 0 { |
| 375 | continue |
| 376 | } |
| 377 | s, ok := streams[to] |
| 378 | if !ok || atomic.LoadInt32(&s.alive) <= 0 { |
| 379 | s = &stream{ |
| 380 | msgCh: make(chan []byte, 100), |
| 381 | alive: 1, |
| 382 | } |
| 383 | go n.streamMessages(to, s) |
| 384 | streams[to] = s |
| 385 | } |
| 386 | data := make([]byte, buf.Len()) |
| 387 | copy(data, buf.Bytes()) |
| 388 | buf.Reset() |
| 389 | |
| 390 | select { |
| 391 | case s.msgCh <- data: |
| 392 | default: |
| 393 | } |
| 394 | } |
| 395 | } |