(tasks *RequestChan)
| 194 | } |
| 195 | |
| 196 | func (s *Session) loopWriter(tasks *RequestChan) (err error) { |
| 197 | defer func() { |
| 198 | s.CloseWithError(err) |
| 199 | tasks.PopFrontAllVoid(func(r *Request) { |
| 200 | s.incrOpFails(r, nil) |
| 201 | }) |
| 202 | s.flushOpStats(true) |
| 203 | }() |
| 204 | |
| 205 | var ( |
| 206 | breakOnFailure = s.config.SessionBreakOnFailure |
| 207 | maxPipelineLen = s.config.SessionMaxPipeline |
| 208 | ) |
| 209 | |
| 210 | p := s.Conn.FlushEncoder() |
| 211 | p.MaxInterval = time.Millisecond |
| 212 | p.MaxBuffered = maxPipelineLen / 2 |
| 213 | |
| 214 | return tasks.PopFrontAll(func(r *Request) error { |
| 215 | resp, err := s.handleResponse(r) |
| 216 | if err != nil { |
| 217 | resp = redis.NewErrorf("ERR handle response, %s", err) |
| 218 | if breakOnFailure { |
| 219 | s.Conn.Encode(resp, true) |
| 220 | return s.incrOpFails(r, err) |
| 221 | } |
| 222 | } |
| 223 | if err := p.Encode(resp); err != nil { |
| 224 | return s.incrOpFails(r, err) |
| 225 | } |
| 226 | fflush := tasks.IsEmpty() |
| 227 | if err := p.Flush(fflush); err != nil { |
| 228 | return s.incrOpFails(r, err) |
| 229 | } else { |
| 230 | s.incrOpStats(r, resp.Type) |
| 231 | } |
| 232 | if fflush { |
| 233 | s.flushOpStats(false) |
| 234 | } |
| 235 | return nil |
| 236 | }) |
| 237 | } |
| 238 | |
| 239 | func (s *Session) handleResponse(r *Request) (*redis.Resp, error) { |
| 240 | r.Batch.Wait() |
no test coverage detected