MCPcopy
hub / github.com/dgraph-io/dgraph / runLocalSubscriber

Method runLocalSubscriber

worker/import.go:217–297  ·  view source on GitHub ↗
(ctx context.Context, stream pb.Worker_StreamExtSnapshotServer)

Source from the content-addressed store, hash-verified

215}
216
217func (ps *pubSub) runLocalSubscriber(ctx context.Context, stream pb.Worker_StreamExtSnapshotServer) error {
218 defer func() {
219 glog.Infof("[import] local subscriber stopped")
220 }()
221
222 buffer := ps.subscribe()
223 defer ps.unsubscribe(buffer) // ensure publisher won't block on us if we exit
224
225 // Defense in depth: never run the destructive Prepare()/dropAll() unless import mode has been
226 // armed by an authorized UpdateExtSnapshotStreamingState(Start). On a follower receiving a
227 // forwarded stream this may briefly lag the leader's Raft apply, so wait with a bounded
228 // deadline rather than failing instantly.
229 if err := waitForExtSnapshotArmed(ctx); err != nil {
230 glog.Errorf("[import:flush] refusing to write external snapshot: %v", err)
231 return err
232 }
233
234 glog.Infof("[import:flush] flushing external snapshot in badger db")
235
236 sw := pstore.NewStreamWriter()
237 defer sw.Cancel()
238
239 // Prepare() calls Badger's dropAll(), which is destructive. Defer it until the first valid
240 // data packet arrives so that a "Done-only" or otherwise empty stream completes as a no-op
241 // and leaves the existing store intact.
242 prepared := false
243
244Loop:
245 for {
246 select {
247 case <-ctx.Done():
248 glog.Infof("[import] Context cancelled, stopping receive goroutine: %v", ctx.Err())
249 return ctx.Err()
250
251 default:
252 msg, ok := <-buffer
253 if !ok {
254 break Loop
255 }
256 kvs := msg.GetPkt()
257 if kvs == nil {
258 continue
259 }
260 if kvs.Done {
261 break
262 }
263 if len(kvs.Data) == 0 {
264 continue
265 }
266
267 if !prepared {
268 if err := sw.Prepare(); err != nil {
269 return err
270 }
271 prepared = true
272 }
273
274 buf := z.NewBufferSlice(kvs.Data)

Callers 1

streamInGroupFunction · 0.95

Calls 9

subscribeMethod · 0.95
unsubscribeMethod · 0.95
waitForExtSnapshotArmedFunction · 0.85
postStreamProcessingFunction · 0.85
InfofMethod · 0.80
WriteMethod · 0.65
FlushMethod · 0.65
ErrorfMethod · 0.45
DoneMethod · 0.45

Tested by

no test coverage detected