RunReadIndexLoop runs the RAFT index in a loop.
(closer *z.Closer, readStateCh <-chan raft.ReadState)
| 675 | |
| 676 | // RunReadIndexLoop runs the RAFT index in a loop. |
| 677 | func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) { |
| 678 | defer closer.Done() |
| 679 | readIndex := func(activeRctx []byte) (uint64, error) { |
| 680 | // Read Request can get rejected then we would wait indefinitely on the channel |
| 681 | // so have a timeout. |
| 682 | ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) |
| 683 | defer cancel() |
| 684 | |
| 685 | if err := n.Raft().ReadIndex(ctx, activeRctx); err != nil { |
| 686 | glog.Errorf("Error while trying to call ReadIndex: %v\n", err) |
| 687 | return 0, err |
| 688 | } |
| 689 | |
| 690 | again: |
| 691 | select { |
| 692 | case <-closer.HasBeenClosed(): |
| 693 | return 0, errors.New("Closer has been called") |
| 694 | case rs := <-readStateCh: |
| 695 | if !bytes.Equal(activeRctx, rs.RequestCtx) { |
| 696 | glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx) |
| 697 | goto again |
| 698 | } |
| 699 | return rs.Index, nil |
| 700 | case <-ctx.Done(): |
| 701 | glog.Warningf("[%#x] Read index context timed out\n", n.Id) |
| 702 | return 0, errInternalRetry |
| 703 | } |
| 704 | } // end of readIndex func |
| 705 | |
| 706 | // We maintain one linearizable ReadIndex request at a time. Others wait queued behind |
| 707 | // requestCh. |
| 708 | requests := []linReadReq{} |
| 709 | for { |
| 710 | select { |
| 711 | case <-closer.HasBeenClosed(): |
| 712 | return |
| 713 | case <-readStateCh: |
| 714 | // Do nothing, discard ReadState as we don't have any pending ReadIndex requests. |
| 715 | case req := <-n.requestCh: |
| 716 | slurpLoop: |
| 717 | for { |
| 718 | requests = append(requests, req) |
| 719 | select { |
| 720 | case req = <-n.requestCh: |
| 721 | default: |
| 722 | break slurpLoop |
| 723 | } |
| 724 | } |
| 725 | // Create one activeRctx slice for the read index, even if we have to call readIndex |
| 726 | // repeatedly. That way, we can process the requests as soon as we encounter the first |
| 727 | // activeRctx. This is better than flooding readIndex with a new activeRctx on each |
| 728 | // call, causing more unique traffic and further delays in request processing. |
| 729 | activeRctx := make([]byte, 8) |
| 730 | x.Check2(n.Rand.Read(activeRctx)) |
| 731 | glog.V(4).Infof("Request readctx: %#x", activeRctx) |
| 732 | for { |
| 733 | index, err := readIndex(activeRctx) |
| 734 | if err == errInternalRetry { |