(ctx context.Context, stream pb.Worker_StreamExtSnapshotServer)
| 215 | } |
| 216 | |
| 217 | func (ps *pubSub) runLocalSubscriber(ctx context.Context, stream pb.Worker_StreamExtSnapshotServer) error { |
| 218 | defer func() { |
| 219 | glog.Infof("[import] local subscriber stopped") |
| 220 | }() |
| 221 | |
| 222 | buffer := ps.subscribe() |
| 223 | defer ps.unsubscribe(buffer) // ensure publisher won't block on us if we exit |
| 224 | |
| 225 | // Defense in depth: never run the destructive Prepare()/dropAll() unless import mode has been |
| 226 | // armed by an authorized UpdateExtSnapshotStreamingState(Start). On a follower receiving a |
| 227 | // forwarded stream this may briefly lag the leader's Raft apply, so wait with a bounded |
| 228 | // deadline rather than failing instantly. |
| 229 | if err := waitForExtSnapshotArmed(ctx); err != nil { |
| 230 | glog.Errorf("[import:flush] refusing to write external snapshot: %v", err) |
| 231 | return err |
| 232 | } |
| 233 | |
| 234 | glog.Infof("[import:flush] flushing external snapshot in badger db") |
| 235 | |
| 236 | sw := pstore.NewStreamWriter() |
| 237 | defer sw.Cancel() |
| 238 | |
| 239 | // Prepare() calls Badger's dropAll(), which is destructive. Defer it until the first valid |
| 240 | // data packet arrives so that a "Done-only" or otherwise empty stream completes as a no-op |
| 241 | // and leaves the existing store intact. |
| 242 | prepared := false |
| 243 | |
| 244 | Loop: |
| 245 | for { |
| 246 | select { |
| 247 | case <-ctx.Done(): |
| 248 | glog.Infof("[import] Context cancelled, stopping receive goroutine: %v", ctx.Err()) |
| 249 | return ctx.Err() |
| 250 | |
| 251 | default: |
| 252 | msg, ok := <-buffer |
| 253 | if !ok { |
| 254 | break Loop |
| 255 | } |
| 256 | kvs := msg.GetPkt() |
| 257 | if kvs == nil { |
| 258 | continue |
| 259 | } |
| 260 | if kvs.Done { |
| 261 | break |
| 262 | } |
| 263 | if len(kvs.Data) == 0 { |
| 264 | continue |
| 265 | } |
| 266 | |
| 267 | if !prepared { |
| 268 | if err := sw.Prepare(); err != nil { |
| 269 | return err |
| 270 | } |
| 271 | prepared = true |
| 272 | } |
| 273 | |
| 274 | buf := z.NewBufferSlice(kvs.Data) |
no test coverage detected