MCPcopy
hub / github.com/CodisLabs/codis / loopWriter

Method loopWriter

pkg/proxy/backend.go:328–368  ·  view source on GitHub ↗
(round int)

Source from the content-addressed store, hash-verified

326}
327
328func (bc *BackendConn) loopWriter(round int) (err error) {
329 defer func() {
330 for i := len(bc.input); i != 0; i-- {
331 r := <-bc.input
332 bc.setResponse(r, nil, ErrBackendConnReset)
333 }
334 log.WarnErrorf(err, "backend conn [%p] to %s, db-%d writer-[%d] exit",
335 bc, bc.addr, bc.database, round)
336 }()
337 c, tasks, err := bc.newBackendReader(round, bc.config)
338 if err != nil {
339 return err
340 }
341 defer close(tasks)
342
343 defer bc.state.Set(0)
344
345 bc.state.Set(stateConnected)
346 bc.retry.fails = 0
347 bc.retry.delay.Reset()
348
349 p := c.FlushEncoder()
350 p.MaxInterval = time.Millisecond
351 p.MaxBuffered = cap(tasks) / 2
352
353 for r := range bc.input {
354 if r.IsReadOnly() && r.IsBroken() {
355 bc.setResponse(r, nil, ErrRequestIsBroken)
356 continue
357 }
358 if err := p.EncodeMultiBulk(r.Multi); err != nil {
359 return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
360 }
361 if err := p.Flush(len(bc.input) == 0); err != nil {
362 return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
363 } else {
364 tasks <- r
365 }
366 }
367 return nil
368}
369
370type sharedBackendConn struct {
371 addr string

Callers 1

runMethod · 0.95

Calls 11

setResponseMethod · 0.95
newBackendReaderMethod · 0.95
WarnErrorfMethod · 0.80
FlushEncoderMethod · 0.80
IsReadOnlyMethod · 0.80
IsBrokenMethod · 0.80
ErrorfMethod · 0.80
ResetMethod · 0.65
SetMethod · 0.45
EncodeMultiBulkMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected