(ctx context.Context, job *Job)
| 437 | } |
| 438 | |
| 439 | func (c *Consumer) delete(ctx context.Context, job *Job) { |
| 440 | if job.Err != nil { |
| 441 | backend.Error(job.Err, "job failed (dropping)", |
| 442 | "task_name", job.TaskName, |
| 443 | "reserved_count", job.ReservedCount) |
| 444 | |
| 445 | if err := c.opt.Handler.HandleJob(ctx, job); err != nil { |
| 446 | backend.Error(err, "fallback handler failed (dropping)", |
| 447 | "task_name", job.TaskName) |
| 448 | } |
| 449 | } |
| 450 | |
| 451 | if err := c.q.Delete(ctx, job); err != nil { |
| 452 | backend.Error(err, "Delete failed", |
| 453 | "task_name", job.TaskName) |
| 454 | } |
| 455 | atomic.AddUint32(&c.inFlight, ^uint32(0)) |
| 456 | } |
| 457 | |
| 458 | // Purge discards messages from the internal queue. |
| 459 | func (c *Consumer) Purge(ctx context.Context) error { |
no test coverage detected