(op func(a *csAttempt) error, onSuccess func())
| 814 | } |
| 815 | |
| 816 | func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { |
| 817 | cs.mu.Lock() |
| 818 | for { |
| 819 | if cs.committed { |
| 820 | cs.mu.Unlock() |
| 821 | // toRPCErr is used in case the error from the attempt comes from |
| 822 | // NewClientStream, which intentionally doesn't return a status |
| 823 | // error to allow for further inspection; all other errors should |
| 824 | // already be status errors. |
| 825 | return toRPCErr(op(cs.attempt)) |
| 826 | } |
| 827 | if len(cs.replayBuffer) == 0 { |
| 828 | // For the first op, which controls creation of the stream and |
| 829 | // assigns cs.attempt, we need to create a new attempt inline |
| 830 | // before executing the first op. On subsequent ops, the attempt |
| 831 | // is created immediately before replaying the ops. |
| 832 | var err error |
| 833 | if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil { |
| 834 | cs.mu.Unlock() |
| 835 | cs.finish(err) |
| 836 | return err |
| 837 | } |
| 838 | } |
| 839 | a := cs.attempt |
| 840 | cs.mu.Unlock() |
| 841 | err := op(a) |
| 842 | cs.mu.Lock() |
| 843 | if a != cs.attempt { |
| 844 | // We started another attempt already. |
| 845 | continue |
| 846 | } |
| 847 | if err == io.EOF { |
| 848 | <-a.transportStream.Done() |
| 849 | } |
| 850 | if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) { |
| 851 | onSuccess() |
| 852 | cs.mu.Unlock() |
| 853 | return err |
| 854 | } |
| 855 | if err := cs.retryLocked(a, err); err != nil { |
| 856 | cs.mu.Unlock() |
| 857 | return err |
| 858 | } |
| 859 | } |
| 860 | } |
| 861 | |
| 862 | func (cs *clientStream) Header() (metadata.MD, error) { |
| 863 | var m metadata.MD |
no test coverage detected