streamBadger runs a BadgerDB stream to send key-value pairs to the specified group. It creates a new stream at the maximum sequence number and sends the data to the specified group. It also sends a final 'done' signal to mark completion.
(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSnapshotClient, groupId uint32)
| 213 | // It creates a new stream at the maximum sequence number and sends the data to the specified group. |
| 214 | // It also sends a final 'done' signal to mark completion. |
| 215 | func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSnapshotClient, groupId uint32) error { |
| 216 | stream := ps.NewStreamAt(math.MaxUint64) |
| 217 | stream.LogPrefix = "[import] Sending external snapshot to group [" + fmt.Sprintf("%d", groupId) + "]" |
| 218 | stream.KeyToList = nil |
| 219 | stream.Send = func(buf *z.Buffer) error { |
| 220 | p := &api.StreamPacket{Data: buf.Bytes()} |
| 221 | if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) { |
| 222 | return fmt.Errorf("failed to send data chunk: %w", err) |
| 223 | } |
| 224 | if _, err := out.Recv(); err != nil { |
| 225 | return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err) |
| 226 | } |
| 227 | glog.Infof("[import] Group [%v]: Received ACK for sending data chunk", groupId) |
| 228 | |
| 229 | return nil |
| 230 | } |
| 231 | |
| 232 | // Execute the stream process |
| 233 | if err := stream.Orchestrate(ctx); err != nil { |
| 234 | return fmt.Errorf("stream orchestration failed for group [%v]: %w, badger path: %s", groupId, err, ps.Opts().Dir) |
| 235 | } |
| 236 | |
| 237 | // Send the final 'done' signal to mark completion |
| 238 | glog.Infof("[import] Sending completion signal for group [%d]", groupId) |
| 239 | done := &api.StreamPacket{Done: true} |
| 240 | |
| 241 | if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: done}); err != nil && !errors.Is(err, io.EOF) { |
| 242 | return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err) |
| 243 | } |
| 244 | |
| 245 | for { |
| 246 | if ctx.Err() != nil { |
| 247 | return ctx.Err() |
| 248 | } |
| 249 | resp, err := out.Recv() |
| 250 | if errors.Is(err, io.EOF) { |
| 251 | return fmt.Errorf("server closed stream before Finish=true for group [%d]", groupId) |
| 252 | } |
| 253 | if err != nil { |
| 254 | return fmt.Errorf("failed to receive final response for group ID [%v] from the server: %w", groupId, err) |
| 255 | } |
| 256 | if resp.Finish { |
| 257 | glog.Infof("[import] Group [%v]: Received final Finish=true", groupId) |
| 258 | break |
| 259 | } |
| 260 | glog.Infof("[import] Group [%v]: Waiting for Finish=true, got interim ACK", groupId) |
| 261 | } |
| 262 | |
| 263 | glog.Infof("[import] Group [%v]: Received ACK for sending completion signal", groupId) |
| 264 | |
| 265 | return nil |
| 266 | } |
no test coverage detected