StatBlobsParallelHelper is for use by blobserver implementations that want to issue stats in parallel. This runs worker in multiple goroutines (bounded by gate), but calls fn in serial, per the BlobStatter contract, and stops once there's a failure. The worker func should return two zero values to
(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error, gate *syncutil.Gate, worker func(blob.Ref) (blob.SizedRef, error))
| 63 | // doesn't exist. (This is different than the StatBlob func, which |
| 64 | // returns os.ErrNotExist) |
| 65 | func StatBlobsParallelHelper(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error, |
| 66 | gate *syncutil.Gate, worker func(blob.Ref) (blob.SizedRef, error)) error { |
| 67 | if len(blobs) == 0 { |
| 68 | return nil |
| 69 | } |
| 70 | |
| 71 | ctx, cancel := context.WithCancel(ctx) |
| 72 | defer cancel() |
| 73 | |
| 74 | var fnMu sync.Mutex // serializes calls to fn |
| 75 | |
| 76 | var wg syncutil.Group |
| 77 | Blobs: |
| 78 | for i := range blobs { |
| 79 | gate.Start() |
| 80 | b := blobs[i] |
| 81 | |
| 82 | select { |
| 83 | case <-ctx.Done(): |
| 84 | // If a previous failed, stop. |
| 85 | break Blobs |
| 86 | default: |
| 87 | } |
| 88 | |
| 89 | wg.Go(func() error { |
| 90 | defer gate.Done() |
| 91 | |
| 92 | sb, err := worker(b) |
| 93 | if err != nil { |
| 94 | cancel() |
| 95 | return err |
| 96 | } |
| 97 | if !sb.Valid() { |
| 98 | // not found. |
| 99 | return nil |
| 100 | } |
| 101 | |
| 102 | fnMu.Lock() |
| 103 | defer fnMu.Unlock() |
| 104 | |
| 105 | select { |
| 106 | case <-ctx.Done(): |
| 107 | // If a previous failed, stop. |
| 108 | return ctx.Err() |
| 109 | default: |
| 110 | } |
| 111 | |
| 112 | if err := fn(sb); err != nil { |
| 113 | cancel() // stop others from running |
| 114 | return err |
| 115 | } |
| 116 | return nil |
| 117 | }) |
| 118 | } |
| 119 | |
| 120 | if err := wg.Err(); err != nil { |
| 121 | return err |
| 122 | } |