InStream handles streaming of snapshots to a target group. It first checks the group associated with the incoming stream and, if it's the same as the current node's group, it flushes the data using FlushKvs. If the group is different, it establishes a connection with the leader of that group and str
(stream api.Dgraph_StreamExtSnapshotServer)
| 338 | // there are any issues in the process, such as a broken connection or failure to establish |
| 339 | // a stream with the leader. |
| 340 | func InStream(stream api.Dgraph_StreamExtSnapshotServer) error { |
| 341 | // Reject the stream unless an authorized UpdateExtSnapshotStreamingState(Start) has armed |
| 342 | // import mode. This is the control that holds even when no auth is configured (both auth |
| 343 | // gates fail open in that case). The wait absorbs a follower's brief Raft apply lag; a stream |
| 344 | // that was never armed waits out the bounded window and is then rejected before any |
| 345 | // destructive work runs. |
| 346 | if err := waitForExtSnapshotArmed(stream.Context()); err != nil { |
| 347 | glog.Errorf("[import] rejecting external snapshot stream: %v", err) |
| 348 | return err |
| 349 | } |
| 350 | |
| 351 | req, err := stream.Recv() |
| 352 | if err != nil { |
| 353 | return fmt.Errorf("failed to receive initial stream message: %v", err) |
| 354 | } |
| 355 | |
| 356 | if err := stream.Send(&api.StreamExtSnapshotResponse{Finish: false}); err != nil { |
| 357 | return fmt.Errorf("failed to send initial response: %v", err) |
| 358 | } |
| 359 | |
| 360 | groupId := req.GroupId |
| 361 | if groupId == groups().Node.gid { |
| 362 | glog.Infof("[import] streaming external snapshot to current group [%v]", groupId) |
| 363 | return streamInGroup(stream, true) |
| 364 | } |
| 365 | |
| 366 | glog.Infof("[import] streaming external snapshot to other group [%v]", groupId) |
| 367 | pl := groups().Leader(groupId) |
| 368 | if pl == nil { |
| 369 | glog.Errorf("[import] unable to connect to the leader of group [%v]", groupId) |
| 370 | return fmt.Errorf("unable to connect to the leader of group [%v] : %v", groupId, conn.ErrNoConnection) |
| 371 | } |
| 372 | |
| 373 | con := pl.Get() |
| 374 | c := pb.NewWorkerClient(con) |
| 375 | alphaStream, err := c.StreamExtSnapshot(stream.Context()) |
| 376 | if err != nil { |
| 377 | glog.Errorf("[import] failed to establish stream with leader: %v", err) |
| 378 | return fmt.Errorf("failed to establish stream with leader: %v", err) |
| 379 | } |
| 380 | glog.Infof("[import] [forward %d -> %d] start", groups().Node.gid, groupId) |
| 381 | glog.Infof("[import] [forward %v -> %d] start", groups().Node.MyAddr, groups().Leader(groupId).Addr) |
| 382 | |
| 383 | glog.Infof("[import] sending forward true to leader of group [%v]", groupId) |
| 384 | forwardReq := &api.StreamExtSnapshotRequest{Forward: true} |
| 385 | if err := alphaStream.Send(forwardReq); err != nil { |
| 386 | glog.Errorf("[import] failed to send forward request: %v", err) |
| 387 | return fmt.Errorf("failed to send forward request: %v", err) |
| 388 | } |
| 389 | |
| 390 | return pipeTwoStream(stream, alphaStream, groupId) |
| 391 | } |
| 392 | |
| 393 | func pipeTwoStream(in api.Dgraph_StreamExtSnapshotServer, out pb.Worker_StreamExtSnapshotClient, groupId uint32) error { |
| 394 | currentGroup := groups().Node.gid |
no test coverage detected