processOracleDeltaStream is used to process oracle delta stream from Zero. Zero sends information about aborted/committed transactions and maxPending.
()
| 924 | // processOracleDeltaStream is used to process oracle delta stream from Zero. |
| 925 | // Zero sends information about aborted/committed transactions and maxPending. |
| 926 | func (g *groupi) processOracleDeltaStream() { |
| 927 | defer func() { |
| 928 | glog.Infoln("Closing processOracleDeltaStream") |
| 929 | g.closer.Done() // CLOSER:1 |
| 930 | }() |
| 931 | |
| 932 | ticker := time.Tick(time.Second) |
| 933 | |
| 934 | blockingReceiveAndPropose := func() { |
| 935 | glog.Infof("Leader idx=%#x of group=%d is connecting to Zero for txn updates\n", |
| 936 | g.Node.Id, g.groupId()) |
| 937 | |
| 938 | pl := g.connToZeroLeader() |
| 939 | if pl == nil { |
| 940 | glog.Warningln("Oracle delta stream: No Zero leader known.") |
| 941 | if g.IsClosed() { |
| 942 | return |
| 943 | } |
| 944 | time.Sleep(time.Second) |
| 945 | return |
| 946 | } |
| 947 | glog.Infof("Got Zero leader: %s", pl.Addr) |
| 948 | |
| 949 | // The following code creates a stream. Then runs a goroutine to pick up events from the |
| 950 | // stream and pushes them to a channel. The main loop loops over the channel, doing smart |
| 951 | // batching. Once a batch is created, it gets proposed. Thus, we can reduce the number of |
| 952 | // times proposals happen, which is a great optimization to have (and a common one in our |
| 953 | // code base). |
| 954 | ctx, cancel := context.WithCancel(g.Ctx()) |
| 955 | defer cancel() |
| 956 | |
| 957 | c := pb.NewZeroClient(pl.Get()) |
| 958 | stream, err := c.Oracle(ctx, &api.Payload{}) |
| 959 | if err != nil { |
| 960 | glog.Errorf("Error while calling Oracle %v\n", err) |
| 961 | time.Sleep(time.Second) |
| 962 | return |
| 963 | } |
| 964 | |
| 965 | deltaCh := make(chan *pb.OracleDelta, 100) |
| 966 | go func() { |
| 967 | // This would exit when either a Recv() returns error. Or, cancel() is called by |
| 968 | // something outside of this goroutine. |
| 969 | defer func() { |
| 970 | if err := stream.CloseSend(); err != nil { |
| 971 | glog.Errorf("Error closing send stream: %+v", err) |
| 972 | } |
| 973 | }() |
| 974 | defer close(deltaCh) |
| 975 | |
| 976 | for { |
| 977 | delta, err := stream.Recv() |
| 978 | if err != nil || delta == nil { |
| 979 | glog.Errorf("Error in oracle delta stream. Error: %v", err) |
| 980 | return |
| 981 | } |
| 982 | |
| 983 | select { |
no test coverage detected