Process is low-level API to process message bypassing the internal queue.
(ctx context.Context, job *Job)
| 371 | |
| 372 | // Process is low-level API to process message bypassing the internal queue. |
| 373 | func (c *Consumer) Process(ctx context.Context, job *Job) error { |
| 374 | atomic.AddUint32(&c.inFlight, 1) |
| 375 | |
| 376 | if job.Delay > 0 { |
| 377 | if err := c.q.AddJob(ctx, job); err != nil { |
| 378 | return err |
| 379 | } |
| 380 | return nil |
| 381 | } |
| 382 | |
| 383 | if job.Err != nil { |
| 384 | job.Delay = -1 |
| 385 | c.Put(ctx, job) |
| 386 | return job.Err |
| 387 | } |
| 388 | |
| 389 | ctx, evt := c.beforeProcessJob(ctx, job) |
| 390 | job.evt = evt |
| 391 | |
| 392 | jobErr := c.opt.Handler.HandleJob(ctx, job) |
| 393 | if jobErr == ErrAsyncTask { |
| 394 | return ErrAsyncTask |
| 395 | } |
| 396 | |
| 397 | job.Err = jobErr |
| 398 | c.Put(ctx, job) |
| 399 | |
| 400 | return job.Err |
| 401 | } |
| 402 | |
| 403 | func (c *Consumer) Put(ctx context.Context, job *Job) { |
| 404 | c.afterProcessJob(ctx, job) |
no test coverage detected