MCPcopy
hub / github.com/dgraph-io/dgraph / pipeTwoStream

Function pipeTwoStream

worker/import.go:393–454  ·  view source on GitHub ↗
(in api.Dgraph_StreamExtSnapshotServer, out pb.Worker_StreamExtSnapshotClient, groupId uint32)

Source from the content-addressed store, hash-verified

391}
392
393func pipeTwoStream(in api.Dgraph_StreamExtSnapshotServer, out pb.Worker_StreamExtSnapshotClient, groupId uint32) error {
394 currentGroup := groups().Node.gid
395 ctx := in.Context()
396
397 for {
398 if err := ctx.Err(); err != nil {
399 return err
400 }
401
402 req, err := in.Recv()
403 if errors.Is(err, io.EOF) {
404 return nil
405 }
406 if err != nil {
407 return fmt.Errorf("recv upstream(%d): %w", currentGroup, err)
408 }
409 if req.Pkt == nil {
410 return fmt.Errorf("unexpected empty request")
411 }
412
413 if req.Pkt.Done {
414 // Forward Done, half-close downstream send.
415 if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: req.Pkt}); err != nil && !errors.Is(err, io.EOF) {
416 return fmt.Errorf("send done downstream(%d): %w", groupId, err)
417 }
418 _ = out.CloseSend()
419
420 // Drain downstream and relay upstream until Finish=true.
421 for {
422 if err := ctx.Err(); err != nil {
423 return err
424 }
425 resp, err := out.Recv()
426 if errors.Is(err, io.EOF) {
427 return fmt.Errorf("downstream(%d) closed before Finish=true", groupId)
428 }
429 if err != nil {
430 return fmt.Errorf("recv final downstream(%d): %w", groupId, err)
431 }
432 if err := in.Send(resp); err != nil {
433 return fmt.Errorf("relay final upstream: %w", err)
434 }
435 if resp.Finish {
436 glog.Infof("[import] [forward %d -> %d] finish", currentGroup, groupId)
437 return nil
438 }
439 }
440 }
441
442 // Normal data chunk: send -> wait ack -> send upstream ack.
443 if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: req.Pkt}); err != nil {
444 return fmt.Errorf("send data downstream(%d): %w", groupId, err)
445 }
446 if _, err := out.Recv(); err != nil {
447 return fmt.Errorf("ack data downstream(%d): %w", groupId, err)
448 }
449 if err := in.Send(&api.StreamExtSnapshotResponse{}); err != nil {
450 return fmt.Errorf("send ack upstream: %w", err)

Callers 1

InStreamFunction · 0.85

Calls 5

groupsFunction · 0.85
InfofMethod · 0.80
RecvMethod · 0.65
SendMethod · 0.65
ErrorfMethod · 0.45

Tested by

no test coverage detected