(round int)
| 326 | } |
| 327 | |
| 328 | func (bc *BackendConn) loopWriter(round int) (err error) { |
| 329 | defer func() { |
| 330 | for i := len(bc.input); i != 0; i-- { |
| 331 | r := <-bc.input |
| 332 | bc.setResponse(r, nil, ErrBackendConnReset) |
| 333 | } |
| 334 | log.WarnErrorf(err, "backend conn [%p] to %s, db-%d writer-[%d] exit", |
| 335 | bc, bc.addr, bc.database, round) |
| 336 | }() |
| 337 | c, tasks, err := bc.newBackendReader(round, bc.config) |
| 338 | if err != nil { |
| 339 | return err |
| 340 | } |
| 341 | defer close(tasks) |
| 342 | |
| 343 | defer bc.state.Set(0) |
| 344 | |
| 345 | bc.state.Set(stateConnected) |
| 346 | bc.retry.fails = 0 |
| 347 | bc.retry.delay.Reset() |
| 348 | |
| 349 | p := c.FlushEncoder() |
| 350 | p.MaxInterval = time.Millisecond |
| 351 | p.MaxBuffered = cap(tasks) / 2 |
| 352 | |
| 353 | for r := range bc.input { |
| 354 | if r.IsReadOnly() && r.IsBroken() { |
| 355 | bc.setResponse(r, nil, ErrRequestIsBroken) |
| 356 | continue |
| 357 | } |
| 358 | if err := p.EncodeMultiBulk(r.Multi); err != nil { |
| 359 | return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) |
| 360 | } |
| 361 | if err := p.Flush(len(bc.input) == 0); err != nil { |
| 362 | return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) |
| 363 | } else { |
| 364 | tasks <- r |
| 365 | } |
| 366 | } |
| 367 | return nil |
| 368 | } |
| 369 | |
| 370 | type sharedBackendConn struct { |
| 371 | addr string |
no test coverage detected