MCPcopy
hub / github.com/dgraph-io/dgraph / runForwardSubscriber

Method runForwardSubscriber

worker/import.go:154–215  ·  view source on GitHub ↗
(ctx context.Context, out api.Dgraph_StreamExtSnapshotClient, peerId string)

Source from the content-addressed store, hash-verified

152}
153
154func (ps *pubSub) runForwardSubscriber(ctx context.Context, out api.Dgraph_StreamExtSnapshotClient, peerId string) error {
155 defer func() {
156 glog.Infof("[import] forward subscriber stopped for peer [%v]", peerId)
157 }()
158
159 buffer := ps.subscribe()
160 defer ps.unsubscribe(buffer) // ensure publisher won't block on us if we exit
161
162Loop:
163 for {
164 select {
165 case <-ctx.Done():
166 glog.Infof("[import] Context cancelled, stopping receive goroutine: %v", ctx.Err())
167 return ctx.Err()
168
169 default:
170 msg, ok := <-buffer
171 if !ok {
172 break Loop
173 }
174
175 if msg.Pkt.Done {
176 glog.Infof("[import] received done signal from [%v]", peerId)
177 d := api.StreamPacket{Done: true}
178 if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: &d}); err != nil {
179 return err
180 }
181
182 _ = out.CloseSend()
183
184 for {
185 if ctx.Err() != nil {
186 return ctx.Err()
187 }
188 r, err := out.Recv()
189 if errors.Is(err, io.EOF) {
190 return fmt.Errorf("server closed stream before Finish=true for peer [%v]", peerId)
191 }
192 if err != nil {
193 return fmt.Errorf("failed to receive final response from peer [%v]: %w", peerId, err)
194 }
195 if r.Finish {
196 glog.Infof("[import] peer [%v]: Received final Finish=true", peerId)
197 break Loop
198 }
199 glog.Infof("[import] peer [%v]: Waiting for Finish=true, got interim ACK", peerId)
200 }
201 }
202
203 data := &api.StreamExtSnapshotRequest{Pkt: &api.StreamPacket{Data: msg.Pkt.Data}}
204 if err := out.Send(data); err != nil {
205 return err
206 }
207
208 if _, err := out.Recv(); err != nil {
209 return fmt.Errorf("failed to receive response from peer [%v]: %w", peerId, err)
210 }
211 }

Callers 1

streamInGroupFunction · 0.95

Calls 7

subscribeMethod · 0.95
unsubscribeMethod · 0.95
InfofMethod · 0.80
SendMethod · 0.65
RecvMethod · 0.65
DoneMethod · 0.45
ErrorfMethod · 0.45

Tested by

no test coverage detected