MCPcopy
hub / github.com/dgraph-io/dgraph / RunReadIndexLoop

Method RunReadIndexLoop

conn/node.go:677–749  ·  view source on GitHub ↗

RunReadIndexLoop runs the RAFT index in a loop.

(closer *z.Closer, readStateCh <-chan raft.ReadState)

Source from the content-addressed store, hash-verified

675
676// RunReadIndexLoop runs the RAFT index in a loop.
677func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) {
678 defer closer.Done()
679 readIndex := func(activeRctx []byte) (uint64, error) {
680 // Read Request can get rejected then we would wait indefinitely on the channel
681 // so have a timeout.
682 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
683 defer cancel()
684
685 if err := n.Raft().ReadIndex(ctx, activeRctx); err != nil {
686 glog.Errorf("Error while trying to call ReadIndex: %v\n", err)
687 return 0, err
688 }
689
690 again:
691 select {
692 case <-closer.HasBeenClosed():
693 return 0, errors.New("Closer has been called")
694 case rs := <-readStateCh:
695 if !bytes.Equal(activeRctx, rs.RequestCtx) {
696 glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx)
697 goto again
698 }
699 return rs.Index, nil
700 case <-ctx.Done():
701 glog.Warningf("[%#x] Read index context timed out\n", n.Id)
702 return 0, errInternalRetry
703 }
704 } // end of readIndex func
705
706 // We maintain one linearizable ReadIndex request at a time. Others wait queued behind
707 // requestCh.
708 requests := []linReadReq{}
709 for {
710 select {
711 case <-closer.HasBeenClosed():
712 return
713 case <-readStateCh:
714 // Do nothing, discard ReadState as we don't have any pending ReadIndex requests.
715 case req := <-n.requestCh:
716 slurpLoop:
717 for {
718 requests = append(requests, req)
719 select {
720 case req = <-n.requestCh:
721 default:
722 break slurpLoop
723 }
724 }
725 // Create one activeRctx slice for the read index, even if we have to call readIndex
726 // repeatedly. That way, we can process the requests as soon as we encounter the first
727 // activeRctx. This is better than flooding readIndex with a new activeRctx on each
728 // call, causing more unique traffic and further delays in request processing.
729 activeRctx := make([]byte, 8)
730 x.Check2(n.Rand.Read(activeRctx))
731 glog.V(4).Infof("Request readctx: %#x", activeRctx)
732 for {
733 index, err := readIndex(activeRctx)
734 if err == errInternalRetry {

Callers 1

RunMethod · 0.80

Calls 7

RaftMethod · 0.95
Check2Function · 0.92
InfofMethod · 0.80
WarningfMethod · 0.80
ReadMethod · 0.65
DoneMethod · 0.45
ErrorfMethod · 0.45

Tested by

no test coverage detected