MCPcopy
hub / github.com/dgraph-io/dgraph / BatchAndSendMessages

Method BatchAndSendMessages

conn/node.go:338–396  ·  view source on GitHub ↗

BatchAndSendMessages sends messages in batches.

()

Source from the content-addressed store, hash-verified

336
337// BatchAndSendMessages sends messages in batches.
338func (n *Node) BatchAndSendMessages() {
339 batches := make(map[uint64]*bytes.Buffer)
340 streams := make(map[uint64]*stream)
341
342 for {
343 totalSize := 0
344 sm := <-n.messages
345 slurp_loop:
346 for {
347 var buf *bytes.Buffer
348 if b, ok := batches[sm.to]; !ok {
349 buf = new(bytes.Buffer)
350 batches[sm.to] = buf
351 } else {
352 buf = b
353 }
354 totalSize += 4 + len(sm.data)
355 x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
356 x.Check2(buf.Write(sm.data))
357
358 if totalSize > messageBatchSoftLimit {
359 // We limit the batch size, but we aren't pushing back on
360 // n.messages, because the loop below spawns a goroutine
361 // to do its dirty work. This is good because right now
362 // (*node).send fails(!) if the channel is full.
363 break
364 }
365
366 select {
367 case sm = <-n.messages:
368 default:
369 break slurp_loop
370 }
371 }
372
373 for to, buf := range batches {
374 if buf.Len() == 0 {
375 continue
376 }
377 s, ok := streams[to]
378 if !ok || atomic.LoadInt32(&s.alive) <= 0 {
379 s = &stream{
380 msgCh: make(chan []byte, 100),
381 alive: 1,
382 }
383 go n.streamMessages(to, s)
384 streams[to] = s
385 }
386 data := make([]byte, buf.Len())
387 copy(data, buf.Bytes())
388 buf.Reset()
389
390 select {
391 case s.msgCh <- data:
392 default:
393 }
394 }
395 }

Callers 2

initAndStartNodeMethod · 0.80
InitAndStartNodeMethod · 0.80

Calls 7

streamMessagesMethod · 0.95
CheckFunction · 0.92
Check2Function · 0.92
copyFunction · 0.85
WriteMethod · 0.65
LenMethod · 0.65
ResetMethod · 0.45

Tested by

no test coverage detected