streamInGroup handles the streaming of data within a group. This function is called on both leader and follower nodes with different behaviors: - Leader (forward=true): The leader node receives data and forwards it to all group members - Follower (forward=false): The follower node receives data and
(stream api.Dgraph_StreamExtSnapshotServer, forward bool)
| 535 | // - nil: If streaming completes successfully |
| 536 | // - error: If there's an issue receiving data or if majority consensus isn't achieved (for leader) |
| 537 | func streamInGroup(stream api.Dgraph_StreamExtSnapshotServer, forward bool) error { |
| 538 | node := groups().Node |
| 539 | glog.Infof("[import] got stream, forwarding in group [%v]", forward) |
| 540 | |
| 541 | // We created this to check the majority |
| 542 | successfulNodes := make(map[string]bool) |
| 543 | |
| 544 | ps := &pubSub{} |
| 545 | eg, errGCtx := errgroup.WithContext(stream.Context()) |
| 546 | for _, member := range groups().state.Groups[node.gid].Members { |
| 547 | if member.Addr == node.MyAddr { |
| 548 | eg.Go(func() error { |
| 549 | if err := ps.runLocalSubscriber(errGCtx, stream); err != nil { |
| 550 | glog.Errorf("[import:flush] failed to run local subscriber: %v", err) |
| 551 | updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false) |
| 552 | return err |
| 553 | } |
| 554 | |
| 555 | updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, true) |
| 556 | return nil |
| 557 | }) |
| 558 | continue |
| 559 | } |
| 560 | |
| 561 | if forward { |
| 562 | // We are not going to return any error from here because we care about the majority of nodes. |
| 563 | // If the majority of nodes are able to receive the data, the remaining ones can catch up later. |
| 564 | glog.Infof("[import] Streaming external snapshot to [%v] from [%v]", member.Addr, node.MyAddr) |
| 565 | eg.Go(func() error { |
| 566 | glog.Infof(`[import:forward] streaming external snapshot to [%v] from [%v]`, member.Addr, node.MyAddr) |
| 567 | if member.AmDead { |
| 568 | glog.Infof(`[import:forward] [%v] is dead, skipping`, member.Addr) |
| 569 | return nil |
| 570 | } |
| 571 | |
| 572 | pl, err := conn.GetPools().Get(member.Addr) |
| 573 | if err != nil { |
| 574 | updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false) |
| 575 | glog.Errorf("connection error to [%v]: %v", member.Addr, err) |
| 576 | return nil |
| 577 | } |
| 578 | |
| 579 | c := pb.NewWorkerClient(pl.Get()) |
| 580 | peerStream, err := c.StreamExtSnapshot(errGCtx) |
| 581 | if err != nil { |
| 582 | updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false) |
| 583 | glog.Errorf("failed to establish stream with peer %v: %v", member.Addr, err) |
| 584 | return nil |
| 585 | } |
| 586 | defer func() { |
| 587 | if err := peerStream.CloseSend(); err != nil { |
| 588 | glog.Errorf("[import:forward] failed to close stream with peer [%v]: %v", member.Addr, err) |
| 589 | } |
| 590 | }() |
| 591 | |
| 592 | forwardReq := &api.StreamExtSnapshotRequest{Forward: false} |
| 593 | if err := peerStream.Send(forwardReq); err != nil { |
| 594 | updateNodeStatus(&ps.RWMutex, successfulNodes, member.Addr, false) |
no test coverage detected