MCPcopy
hub / github.com/uptrace/uptrace / Process

Method Process

pkg/taskq/consumer.go:373–401  ·  view source on GitHub ↗

Process is low-level API to process message bypassing the internal queue.

(ctx context.Context, job *Job)

Source from the content-addressed store, hash-verified

371
372// Process is low-level API to process message bypassing the internal queue.
373func (c *Consumer) Process(ctx context.Context, job *Job) error {
374 atomic.AddUint32(&c.inFlight, 1)
375
376 if job.Delay > 0 {
377 if err := c.q.AddJob(ctx, job); err != nil {
378 return err
379 }
380 return nil
381 }
382
383 if job.Err != nil {
384 job.Delay = -1
385 c.Put(ctx, job)
386 return job.Err
387 }
388
389 ctx, evt := c.beforeProcessJob(ctx, job)
390 job.evt = evt
391
392 jobErr := c.opt.Handler.HandleJob(ctx, job)
393 if jobErr == ErrAsyncTask {
394 return ErrAsyncTask
395 }
396
397 job.Err = jobErr
398 c.Put(ctx, job)
399
400 return job.Err
401}
402
403func (c *Consumer) Put(ctx context.Context, job *Job) {
404 c.afterProcessJob(ctx, job)

Callers 2

ProcessOneMethod · 0.95
workerMethod · 0.95

Calls 4

PutMethod · 0.95
beforeProcessJobMethod · 0.95
AddJobMethod · 0.65
HandleJobMethod · 0.65

Tested by

no test coverage detected