MCPcopy
hub / github.com/dgraph-io/dgraph / processOracleDeltaStream

Method processOracleDeltaStream

worker/groups.go:926–1095  ·  view source on GitHub ↗

processOracleDeltaStream is used to process oracle delta stream from Zero. Zero sends information about aborted/committed transactions and maxPending.

()

Source from the content-addressed store, hash-verified

924// processOracleDeltaStream is used to process oracle delta stream from Zero.
925// Zero sends information about aborted/committed transactions and maxPending.
926func (g *groupi) processOracleDeltaStream() {
927 defer func() {
928 glog.Infoln("Closing processOracleDeltaStream")
929 g.closer.Done() // CLOSER:1
930 }()
931
932 ticker := time.Tick(time.Second)
933
934 blockingReceiveAndPropose := func() {
935 glog.Infof("Leader idx=%#x of group=%d is connecting to Zero for txn updates\n",
936 g.Node.Id, g.groupId())
937
938 pl := g.connToZeroLeader()
939 if pl == nil {
940 glog.Warningln("Oracle delta stream: No Zero leader known.")
941 if g.IsClosed() {
942 return
943 }
944 time.Sleep(time.Second)
945 return
946 }
947 glog.Infof("Got Zero leader: %s", pl.Addr)
948
949 // The following code creates a stream. Then runs a goroutine to pick up events from the
950 // stream and pushes them to a channel. The main loop loops over the channel, doing smart
951 // batching. Once a batch is created, it gets proposed. Thus, we can reduce the number of
952 // times proposals happen, which is a great optimization to have (and a common one in our
953 // code base).
954 ctx, cancel := context.WithCancel(g.Ctx())
955 defer cancel()
956
957 c := pb.NewZeroClient(pl.Get())
958 stream, err := c.Oracle(ctx, &api.Payload{})
959 if err != nil {
960 glog.Errorf("Error while calling Oracle %v\n", err)
961 time.Sleep(time.Second)
962 return
963 }
964
965 deltaCh := make(chan *pb.OracleDelta, 100)
966 go func() {
967 // This would exit when either a Recv() returns error. Or, cancel() is called by
968 // something outside of this goroutine.
969 defer func() {
970 if err := stream.CloseSend(); err != nil {
971 glog.Errorf("Error closing send stream: %+v", err)
972 }
973 }()
974 defer close(deltaCh)
975
976 for {
977 delta, err := stream.Recv()
978 if err != nil || delta == nil {
979 glog.Errorf("Error in oracle delta stream. Error: %v", err)
980 return
981 }
982
983 select {

Callers 1

StartRaftNodesFunction · 0.80

Calls 15

groupIdMethod · 0.95
connToZeroLeaderMethod · 0.95
IsClosedMethod · 0.95
CtxMethod · 0.95
OracleMethod · 0.95
LeaderMethod · 0.95
NewZeroClientFunction · 0.92
MaxFunction · 0.92
InfofMethod · 0.80
SliceMethod · 0.80
GetMethod · 0.65
RecvMethod · 0.65

Tested by

no test coverage detected