MCPcopy
hub / github.com/CodisLabs/codis / loopWriter

Method loopWriter

pkg/proxy/session.go:196–237  ·  view source on GitHub ↗
(tasks *RequestChan)

Source from the content-addressed store, hash-verified

194}
195
196func (s *Session) loopWriter(tasks *RequestChan) (err error) {
197 defer func() {
198 s.CloseWithError(err)
199 tasks.PopFrontAllVoid(func(r *Request) {
200 s.incrOpFails(r, nil)
201 })
202 s.flushOpStats(true)
203 }()
204
205 var (
206 breakOnFailure = s.config.SessionBreakOnFailure
207 maxPipelineLen = s.config.SessionMaxPipeline
208 )
209
210 p := s.Conn.FlushEncoder()
211 p.MaxInterval = time.Millisecond
212 p.MaxBuffered = maxPipelineLen / 2
213
214 return tasks.PopFrontAll(func(r *Request) error {
215 resp, err := s.handleResponse(r)
216 if err != nil {
217 resp = redis.NewErrorf("ERR handle response, %s", err)
218 if breakOnFailure {
219 s.Conn.Encode(resp, true)
220 return s.incrOpFails(r, err)
221 }
222 }
223 if err := p.Encode(resp); err != nil {
224 return s.incrOpFails(r, err)
225 }
226 fflush := tasks.IsEmpty()
227 if err := p.Flush(fflush); err != nil {
228 return s.incrOpFails(r, err)
229 } else {
230 s.incrOpStats(r, resp.Type)
231 }
232 if fflush {
233 s.flushOpStats(false)
234 }
235 return nil
236 })
237}
238
239func (s *Session) handleResponse(r *Request) (*redis.Resp, error) {
240 r.Batch.Wait()

Callers 1

StartMethod · 0.95

Calls 11

CloseWithErrorMethod · 0.95
incrOpFailsMethod · 0.95
flushOpStatsMethod · 0.95
handleResponseMethod · 0.95
incrOpStatsMethod · 0.95
PopFrontAllVoidMethod · 0.80
FlushEncoderMethod · 0.80
PopFrontAllMethod · 0.80
IsEmptyMethod · 0.80
EncodeMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected