MCPcopy
hub / github.com/kopia/kopia / Process

Method Process

internal/parallelwork/parallel_work_queue.go:61–95  ·  view source on GitHub ↗

Process starts N workers, which will be processing elements in the queue until the queue is empty and all workers are idle or until any of the workers returns an error.

(ctx context.Context, workers int)

Source from the content-addressed store, hash-verified

59// Process starts N workers, which will be processing elements in the queue until the queue
60// is empty and all workers are idle or until any of the workers returns an error.
61func (v *Queue) Process(ctx context.Context, workers int) error {
62 defer v.reportProgress(ctx)
63
64 eg, ctx := errgroup.WithContext(ctx)
65
66 for range workers {
67 eg.Go(func() error {
68 for {
69 select {
70 case <-ctx.Done():
71 // context canceled - some other worker returned an error.
72 return ctx.Err()
73
74 default:
75 callback := v.dequeue(ctx)
76 if callback == nil {
77 // no more work, shut down.
78 return nil
79 }
80
81 err := callback()
82
83 v.completed(ctx)
84
85 if err != nil {
86 return err
87 }
88 }
89 }
90 })
91 }
92
93 //nolint:wrapcheck
94 return eg.Wait()
95}
96
97func (v *Queue) dequeue(ctx context.Context) CallbackFunc {
98 v.monitor.L.Lock()

Callers 8

TestProcessWithErrorFunction · 0.95
TestWaitForActiveWorkersFunction · 0.95
TestProgressCallbackFunction · 0.95
CleanupOldDataFunction · 0.95
ListBlobsMethod · 0.95
makeVerifyWalkerFuncMethod · 0.45

Calls 7

reportProgressMethod · 0.95
dequeueMethod · 0.95
completedMethod · 0.95
callbackFuncType · 0.85
ErrMethod · 0.80
DoneMethod · 0.65
WaitMethod · 0.65

Tested by 5

TestProcessWithErrorFunction · 0.76
TestWaitForActiveWorkersFunction · 0.76
TestProgressCallbackFunction · 0.76