MCPcopy
hub / github.com/dgraph-io/dgraph / InStream

Function InStream

worker/import.go:340–391  ·  view source on GitHub ↗

InStream handles streaming of snapshots to a target group. It first checks the group associated with the incoming stream and, if it's the same as the current node's group, it flushes the data using FlushKvs. If the group is different, it establishes a connection with the leader of that group and str

(stream api.Dgraph_StreamExtSnapshotServer)

Source from the content-addressed store, hash-verified

338// there are any issues in the process, such as a broken connection or failure to establish
339// a stream with the leader.
340func InStream(stream api.Dgraph_StreamExtSnapshotServer) error {
341 // Reject the stream unless an authorized UpdateExtSnapshotStreamingState(Start) has armed
342 // import mode. This is the control that holds even when no auth is configured (both auth
343 // gates fail open in that case). The wait absorbs a follower's brief Raft apply lag; a stream
344 // that was never armed waits out the bounded window and is then rejected before any
345 // destructive work runs.
346 if err := waitForExtSnapshotArmed(stream.Context()); err != nil {
347 glog.Errorf("[import] rejecting external snapshot stream: %v", err)
348 return err
349 }
350
351 req, err := stream.Recv()
352 if err != nil {
353 return fmt.Errorf("failed to receive initial stream message: %v", err)
354 }
355
356 if err := stream.Send(&api.StreamExtSnapshotResponse{Finish: false}); err != nil {
357 return fmt.Errorf("failed to send initial response: %v", err)
358 }
359
360 groupId := req.GroupId
361 if groupId == groups().Node.gid {
362 glog.Infof("[import] streaming external snapshot to current group [%v]", groupId)
363 return streamInGroup(stream, true)
364 }
365
366 glog.Infof("[import] streaming external snapshot to other group [%v]", groupId)
367 pl := groups().Leader(groupId)
368 if pl == nil {
369 glog.Errorf("[import] unable to connect to the leader of group [%v]", groupId)
370 return fmt.Errorf("unable to connect to the leader of group [%v] : %v", groupId, conn.ErrNoConnection)
371 }
372
373 con := pl.Get()
374 c := pb.NewWorkerClient(con)
375 alphaStream, err := c.StreamExtSnapshot(stream.Context())
376 if err != nil {
377 glog.Errorf("[import] failed to establish stream with leader: %v", err)
378 return fmt.Errorf("failed to establish stream with leader: %v", err)
379 }
380 glog.Infof("[import] [forward %d -> %d] start", groups().Node.gid, groupId)
381 glog.Infof("[import] [forward %v -> %d] start", groups().Node.MyAddr, groups().Leader(groupId).Addr)
382
383 glog.Infof("[import] sending forward true to leader of group [%v]", groupId)
384 forwardReq := &api.StreamExtSnapshotRequest{Forward: true}
385 if err := alphaStream.Send(forwardReq); err != nil {
386 glog.Errorf("[import] failed to send forward request: %v", err)
387 return fmt.Errorf("failed to send forward request: %v", err)
388 }
389
390 return pipeTwoStream(stream, alphaStream, groupId)
391}
392
393func pipeTwoStream(in api.Dgraph_StreamExtSnapshotServer, out pb.Worker_StreamExtSnapshotClient, groupId uint32) error {
394 currentGroup := groups().Node.gid

Callers 1

StreamExtSnapshotMethod · 0.92

Calls 12

StreamExtSnapshotMethod · 0.95
NewWorkerClientFunction · 0.92
waitForExtSnapshotArmedFunction · 0.85
groupsFunction · 0.85
streamInGroupFunction · 0.85
pipeTwoStreamFunction · 0.85
InfofMethod · 0.80
RecvMethod · 0.65
SendMethod · 0.65
GetMethod · 0.65
ErrorfMethod · 0.45
LeaderMethod · 0.45

Tested by

no test coverage detected