WaitLinearizableRead waits until a linearizable read can be performed.
(ctx context.Context)
| 641 | |
| 642 | // WaitLinearizableRead waits until a linearizable read can be performed. |
| 643 | func (n *Node) WaitLinearizableRead(ctx context.Context) error { |
| 644 | span := trace.SpanFromContext(ctx) |
| 645 | span.AddEvent("WaitLinearizableRead") |
| 646 | |
| 647 | if num := atomic.AddUint64(&readIndexTotal, 1); num%1000 == 0 { |
| 648 | glog.V(2).Infof("ReadIndex Total: %d\n", num) |
| 649 | } |
| 650 | indexCh := make(chan uint64, 1) |
| 651 | select { |
| 652 | case n.requestCh <- linReadReq{indexCh: indexCh}: |
| 653 | span.AddEvent("Pushed to requestCh") |
| 654 | case <-ctx.Done(): |
| 655 | span.AddEvent("Context expired") |
| 656 | return ctx.Err() |
| 657 | } |
| 658 | |
| 659 | select { |
| 660 | case index := <-indexCh: |
| 661 | span.AddEvent(fmt.Sprintf("Received index %d", index)) |
| 662 | if index == 0 { |
| 663 | return errReadIndex |
| 664 | } else if num := atomic.AddUint64(&readIndexOk, 1); num%1000 == 0 { |
| 665 | glog.V(2).Infof("ReadIndex OK: %d\n", num) |
| 666 | } |
| 667 | err := n.Applied.WaitForMark(ctx, index) |
| 668 | span.AddEvent(fmt.Sprintf("Error from Applied.WaitForMark: %v", err)) |
| 669 | return err |
| 670 | case <-ctx.Done(): |
| 671 | span.AddEvent("Context expired") |
| 672 | return ctx.Err() |
| 673 | } |
| 674 | } |
| 675 | |
| 676 | // RunReadIndexLoop runs the RAFT index in a loop. |
| 677 | func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) { |
no test coverage detected