MCPcopy
hub / github.com/dgraph-io/dgraph / streamBadger

Function streamBadger

dgraph/cmd/dgraphimport/import_client.go:215–266  ·  view source on GitHub ↗

streamBadger runs a BadgerDB stream to send key-value pairs to the specified group. It creates a new stream at the maximum sequence number and sends the data to the specified group. It also sends a final 'done' signal to mark completion.

(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSnapshotClient, groupId uint32)

Source from the content-addressed store, hash-verified

213// It creates a new stream at the maximum sequence number and sends the data to the specified group.
214// It also sends a final 'done' signal to mark completion.
215func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSnapshotClient, groupId uint32) error {
216 stream := ps.NewStreamAt(math.MaxUint64)
217 stream.LogPrefix = "[import] Sending external snapshot to group [" + fmt.Sprintf("%d", groupId) + "]"
218 stream.KeyToList = nil
219 stream.Send = func(buf *z.Buffer) error {
220 p := &api.StreamPacket{Data: buf.Bytes()}
221 if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
222 return fmt.Errorf("failed to send data chunk: %w", err)
223 }
224 if _, err := out.Recv(); err != nil {
225 return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
226 }
227 glog.Infof("[import] Group [%v]: Received ACK for sending data chunk", groupId)
228
229 return nil
230 }
231
232 // Execute the stream process
233 if err := stream.Orchestrate(ctx); err != nil {
234 return fmt.Errorf("stream orchestration failed for group [%v]: %w, badger path: %s", groupId, err, ps.Opts().Dir)
235 }
236
237 // Send the final 'done' signal to mark completion
238 glog.Infof("[import] Sending completion signal for group [%d]", groupId)
239 done := &api.StreamPacket{Done: true}
240
241 if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: done}); err != nil && !errors.Is(err, io.EOF) {
242 return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
243 }
244
245 for {
246 if ctx.Err() != nil {
247 return ctx.Err()
248 }
249 resp, err := out.Recv()
250 if errors.Is(err, io.EOF) {
251 return fmt.Errorf("server closed stream before Finish=true for group [%d]", groupId)
252 }
253 if err != nil {
254 return fmt.Errorf("failed to receive final response for group ID [%v] from the server: %w", groupId, err)
255 }
256 if resp.Finish {
257 glog.Infof("[import] Group [%v]: Received final Finish=true", groupId)
258 break
259 }
260 glog.Infof("[import] Group [%v]: Waiting for Finish=true, got interim ACK", groupId)
261 }
262
263 glog.Infof("[import] Group [%v]: Received ACK for sending completion signal", groupId)
264
265 return nil
266}

Callers 1

streamSnapshotForGroupFunction · 0.85

Calls 4

InfofMethod · 0.80
SendMethod · 0.65
RecvMethod · 0.65
ErrorfMethod · 0.45

Tested by

no test coverage detected