waitForExtSnapshotArmed blocks until import mode is armed, the context is cancelled, or a bounded deadline elapses. Import mode is armed cluster-wide by an authorized UpdateExtSnapshotStreamingState(Start) proposal, which every group member applies via Raft (node.applyCommitted), so the flag is set
(ctx context.Context)
| 45 | // covers a follower's async-apply lag. It polls rather than sleeping for a fixed duration so it |
| 46 | // returns as soon as the flag flips. |
| 47 | func waitForExtSnapshotArmed(ctx context.Context) error { |
| 48 | if x.IsExtSnapshotStreamingStateTrue() { |
| 49 | return nil |
| 50 | } |
| 51 | |
| 52 | ctx, cancel := context.WithTimeout(ctx, extSnapshotArmWait) |
| 53 | defer cancel() |
| 54 | ticker := time.NewTicker(50 * time.Millisecond) |
| 55 | defer ticker.Stop() |
| 56 | for { |
| 57 | select { |
| 58 | case <-ctx.Done(): |
| 59 | return errExtSnapshotNotArmed |
| 60 | case <-ticker.C: |
| 61 | if x.IsExtSnapshotStreamingStateTrue() { |
| 62 | return nil |
| 63 | } |
| 64 | } |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | type pubSub struct { |
| 69 | sync.RWMutex |
no test coverage detected