Process starts N workers, which will be processing elements in the queue until the queue is empty and all workers are idle or until any of the workers returns an error.
(ctx context.Context, workers int)
| 59 | // Process starts N workers, which will be processing elements in the queue until the queue |
| 60 | // is empty and all workers are idle or until any of the workers returns an error. |
| 61 | func (v *Queue) Process(ctx context.Context, workers int) error { |
| 62 | defer v.reportProgress(ctx) |
| 63 | |
| 64 | eg, ctx := errgroup.WithContext(ctx) |
| 65 | |
| 66 | for range workers { |
| 67 | eg.Go(func() error { |
| 68 | for { |
| 69 | select { |
| 70 | case <-ctx.Done(): |
| 71 | // context canceled - some other worker returned an error. |
| 72 | return ctx.Err() |
| 73 | |
| 74 | default: |
| 75 | callback := v.dequeue(ctx) |
| 76 | if callback == nil { |
| 77 | // no more work, shut down. |
| 78 | return nil |
| 79 | } |
| 80 | |
| 81 | err := callback() |
| 82 | |
| 83 | v.completed(ctx) |
| 84 | |
| 85 | if err != nil { |
| 86 | return err |
| 87 | } |
| 88 | } |
| 89 | } |
| 90 | }) |
| 91 | } |
| 92 | |
| 93 | //nolint:wrapcheck |
| 94 | return eg.Wait() |
| 95 | } |
| 96 | |
| 97 | func (v *Queue) dequeue(ctx context.Context) CallbackFunc { |
| 98 | v.monitor.L.Lock() |