MCPcopy
hub / github.com/dgraph-io/dgraph / streamInGroup

Function streamInGroup

worker/import.go:537–642  ·  view source on GitHub ↗

streamInGroup handles the streaming of data within a group. This function is called on both leader and follower nodes with different behaviors: - Leader (forward=true): The leader node receives data and forwards it to all group members - Follower (forward=false): The follower node receives data and

(stream api.Dgraph_StreamExtSnapshotServer, forward bool)

Source from the content-addressed store, hash-verified

535// - nil: If streaming completes successfully
536// - error: If there's an issue receiving data or if majority consensus isn't achieved (for leader)
537func streamInGroup(stream api.Dgraph_StreamExtSnapshotServer, forward bool) error {
538 node := groups().Node
539 glog.Infof("[import] got stream, forwarding in group [%v]", forward)
540
541 // We created this to check the majority
542 successfulNodes := make(map[string]bool)
543
544 ps := &pubSub{}
545 eg, errGCtx := errgroup.WithContext(stream.Context())
546 for _, member := range groups().state.Groups[node.gid].Members {
547 if member.Addr == node.MyAddr {
548 eg.Go(func() error {
549 if err := ps.runLocalSubscriber(errGCtx, stream); err != nil {
550 glog.Errorf("[import:flush] failed to run local subscriber: %v", err)
551 updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false)
552 return err
553 }
554
555 updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, true)
556 return nil
557 })
558 continue
559 }
560
561 if forward {
562 // We are not going to return any error from here because we care about the majority of nodes.
563 // If the majority of nodes are able to receive the data, the remaining ones can catch up later.
564 glog.Infof("[import] Streaming external snapshot to [%v] from [%v]", member.Addr, node.MyAddr)
565 eg.Go(func() error {
566 glog.Infof(`[import:forward] streaming external snapshot to [%v] from [%v]`, member.Addr, node.MyAddr)
567 if member.AmDead {
568 glog.Infof(`[import:forward] [%v] is dead, skipping`, member.Addr)
569 return nil
570 }
571
572 pl, err := conn.GetPools().Get(member.Addr)
573 if err != nil {
574 updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false)
575 glog.Errorf("connection error to [%v]: %v", member.Addr, err)
576 return nil
577 }
578
579 c := pb.NewWorkerClient(pl.Get())
580 peerStream, err := c.StreamExtSnapshot(errGCtx)
581 if err != nil {
582 updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false)
583 glog.Errorf("failed to establish stream with peer %v: %v", member.Addr, err)
584 return nil
585 }
586 defer func() {
587 if err := peerStream.CloseSend(); err != nil {
588 glog.Errorf("[import:forward] failed to close stream with peer [%v]: %v", member.Addr, err)
589 }
590 }()
591
592 forwardReq := &api.StreamExtSnapshotRequest{Forward: false}
593 if err := peerStream.Send(forwardReq); err != nil {
594 updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false)

Callers 2

InStreamFunction · 0.85
StreamExtSnapshotMethod · 0.85

Calls 15

runLocalSubscriberMethod · 0.95
StreamExtSnapshotMethod · 0.95
runForwardSubscriberMethod · 0.95
closeMethod · 0.95
handlePublisherMethod · 0.95
GetPoolsFunction · 0.92
NewWorkerClientFunction · 0.92
groupsFunction · 0.85
updateNodeStatusFunction · 0.85
checkMajorityFunction · 0.85
InfofMethod · 0.80
WaitMethod · 0.80

Tested by

no test coverage detected