MCPcopy
hub / github.com/grpc/grpc-go / withRetry

Method withRetry

stream.go:816–860  ·  view source on GitHub ↗
(op func(a *csAttempt) error, onSuccess func())

Source from the content-addressed store, hash-verified

814}
815
816func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
817 cs.mu.Lock()
818 for {
819 if cs.committed {
820 cs.mu.Unlock()
821 // toRPCErr is used in case the error from the attempt comes from
822 // NewClientStream, which intentionally doesn't return a status
823 // error to allow for further inspection; all other errors should
824 // already be status errors.
825 return toRPCErr(op(cs.attempt))
826 }
827 if len(cs.replayBuffer) == 0 {
828 // For the first op, which controls creation of the stream and
829 // assigns cs.attempt, we need to create a new attempt inline
830 // before executing the first op. On subsequent ops, the attempt
831 // is created immediately before replaying the ops.
832 var err error
833 if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
834 cs.mu.Unlock()
835 cs.finish(err)
836 return err
837 }
838 }
839 a := cs.attempt
840 cs.mu.Unlock()
841 err := op(a)
842 cs.mu.Lock()
843 if a != cs.attempt {
844 // We started another attempt already.
845 continue
846 }
847 if err == io.EOF {
848 <-a.transportStream.Done()
849 }
850 if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) {
851 onSuccess()
852 cs.mu.Unlock()
853 return err
854 }
855 if err := cs.retryLocked(a, err); err != nil {
856 cs.mu.Unlock()
857 return err
858 }
859 }
860}
861
862func (cs *clientStream) Header() (metadata.MD, error) {
863 var m metadata.MD

Callers 5

HeaderMethod · 0.95
SendMsgMethod · 0.95
RecvMsgMethod · 0.95
CloseSendMethod · 0.95

Calls 9

newAttemptLockedMethod · 0.95
finishMethod · 0.95
retryLockedMethod · 0.95
toRPCErrFunction · 0.85
CodeMethod · 0.80
StatusMethod · 0.80
LockMethod · 0.45
UnlockMethod · 0.45
DoneMethod · 0.45

Tested by

no test coverage detected