| 391 | } |
| 392 | |
| 393 | func pipeTwoStream(in api.Dgraph_StreamExtSnapshotServer, out pb.Worker_StreamExtSnapshotClient, groupId uint32) error { |
| 394 | currentGroup := groups().Node.gid |
| 395 | ctx := in.Context() |
| 396 | |
| 397 | for { |
| 398 | if err := ctx.Err(); err != nil { |
| 399 | return err |
| 400 | } |
| 401 | |
| 402 | req, err := in.Recv() |
| 403 | if errors.Is(err, io.EOF) { |
| 404 | return nil |
| 405 | } |
| 406 | if err != nil { |
| 407 | return fmt.Errorf("recv upstream(%d): %w", currentGroup, err) |
| 408 | } |
| 409 | if req.Pkt == nil { |
| 410 | return fmt.Errorf("unexpected empty request") |
| 411 | } |
| 412 | |
| 413 | if req.Pkt.Done { |
| 414 | // Forward Done, half-close downstream send. |
| 415 | if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: req.Pkt}); err != nil && !errors.Is(err, io.EOF) { |
| 416 | return fmt.Errorf("send done downstream(%d): %w", groupId, err) |
| 417 | } |
| 418 | _ = out.CloseSend() |
| 419 | |
| 420 | // Drain downstream and relay upstream until Finish=true. |
| 421 | for { |
| 422 | if err := ctx.Err(); err != nil { |
| 423 | return err |
| 424 | } |
| 425 | resp, err := out.Recv() |
| 426 | if errors.Is(err, io.EOF) { |
| 427 | return fmt.Errorf("downstream(%d) closed before Finish=true", groupId) |
| 428 | } |
| 429 | if err != nil { |
| 430 | return fmt.Errorf("recv final downstream(%d): %w", groupId, err) |
| 431 | } |
| 432 | if err := in.Send(resp); err != nil { |
| 433 | return fmt.Errorf("relay final upstream: %w", err) |
| 434 | } |
| 435 | if resp.Finish { |
| 436 | glog.Infof("[import] [forward %d -> %d] finish", currentGroup, groupId) |
| 437 | return nil |
| 438 | } |
| 439 | } |
| 440 | } |
| 441 | |
| 442 | // Normal data chunk: send -> wait ack -> send upstream ack. |
| 443 | if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: req.Pkt}); err != nil { |
| 444 | return fmt.Errorf("send data downstream(%d): %w", groupId, err) |
| 445 | } |
| 446 | if _, err := out.Recv(); err != nil { |
| 447 | return fmt.Errorf("ack data downstream(%d): %w", groupId, err) |
| 448 | } |
| 449 | if err := in.Send(&api.StreamExtSnapshotResponse{}); err != nil { |
| 450 | return fmt.Errorf("send ack upstream: %w", err) |