WaitForZeroAddress polls /state until the member identified by raftID has wantAddr. It first tries queryIdx; if that node's /state is unavailable (e.g. it temporarily lost quorum during a leadership change) it falls back to any other Zero in the cluster. The last observed address and an error are re
(queryIdx int, raftID, wantAddr string, timeout, poll time.Duration)
| 139 | // to any other Zero in the cluster. The last observed address and an error |
| 140 | // are returned if the target is not reached within timeout. |
| 141 | func (c *LocalCluster) WaitForZeroAddress(queryIdx int, raftID, wantAddr string, |
| 142 | timeout, poll time.Duration) (string, error) { |
| 143 | |
| 144 | deadline := time.Now().Add(timeout) |
| 145 | var lastAddr string |
| 146 | var lastErr error |
| 147 | for time.Now().Before(deadline) { |
| 148 | // Try queryIdx first, fall back to other nodes when quorum is transiently lost. |
| 149 | for _, idx := range append([]int{queryIdx}, otherZeroIdxs(queryIdx, c.conf.numZeros)...) { |
| 150 | state, err := c.GetZeroState(idx) |
| 151 | if err != nil { |
| 152 | lastErr = err |
| 153 | continue |
| 154 | } |
| 155 | z, ok := state.Zeros[raftID] |
| 156 | if !ok { |
| 157 | lastErr = fmt.Errorf("raft id %s not present in /state", raftID) |
| 158 | continue |
| 159 | } |
| 160 | lastAddr = z.Addr |
| 161 | if lastAddr == wantAddr { |
| 162 | return lastAddr, nil |
| 163 | } |
| 164 | break |
| 165 | } |
| 166 | time.Sleep(poll) |
| 167 | } |
| 168 | if lastErr != nil { |
| 169 | return lastAddr, errors.Wrapf(lastErr, |
| 170 | "timed out waiting for zero %s addr=%q on zero%d (last seen %q)", |
| 171 | raftID, wantAddr, queryIdx, lastAddr) |
| 172 | } |
| 173 | return lastAddr, fmt.Errorf( |
| 174 | "timed out waiting for zero %s addr=%q on zero%d (last seen %q)", |
| 175 | raftID, wantAddr, queryIdx, lastAddr) |
| 176 | } |
| 177 | |
| 178 | // otherZeroIdxs returns all zero indices except exclude, preserving order. |
| 179 | func otherZeroIdxs(exclude, numZeros int) []int { |