| 749 | } |
| 750 | |
| 751 | func (n *Node) joinCluster(ctx context.Context, rc *pb.RaftContext) (*api.Payload, error) { |
| 752 | // Only process one JoinCluster request at a time. |
| 753 | n.joinLock.Lock() |
| 754 | defer n.joinLock.Unlock() |
| 755 | |
| 756 | // Check that the new node is from the same group as me. |
| 757 | if rc.Group != n.RaftContext.Group { |
| 758 | return nil, errors.Errorf("Raft group mismatch") |
| 759 | } |
| 760 | // Also check that the new node is not me. |
| 761 | if rc.Id == n.RaftContext.Id { |
| 762 | return nil, errors.Errorf("REUSE_RAFTID: Raft ID duplicates mine: %+v", rc) |
| 763 | } |
| 764 | |
| 765 | // Reject if a peer with the same Raft ID is already registered at a |
| 766 | // different address AND that peer is still genuinely connected. Using the |
| 767 | // gRPC connection state is more accurate than IsHealthy(), which relies on |
| 768 | // a heartbeat timestamp and stays "healthy" for ~2s after the peer drops. |
| 769 | if addr, ok := n.Peer(rc.Id); ok && rc.Addr != addr { |
| 770 | if pool, err := GetPools().Get(addr); err == nil && |
| 771 | pool.Get().GetState() == connectivity.Ready { |
| 772 | return &api.Payload{}, errors.Errorf( |
| 773 | "REUSE_ADDR: IP Address same as existing peer: %s", addr) |
| 774 | } |
| 775 | } |
| 776 | n.Connect(rc.Id, rc.Addr) |
| 777 | |
| 778 | err := n.addToCluster(context.Background(), rc) |
| 779 | glog.Infof("[%#x] Done joining cluster with err: %v", rc.Id, err) |
| 780 | return &api.Payload{}, err |
| 781 | } |