(ctx context.Context, rep repo.DirectRepositoryWriter, prefixes []blob.ID, processedBlobCount, recoveredContentCount *atomic.Int32)
| 98 | } |
| 99 | |
| 100 | func (c *commandIndexRecover) recoverIndexesFromAllPacks(ctx context.Context, rep repo.DirectRepositoryWriter, prefixes []blob.ID, processedBlobCount, recoveredContentCount *atomic.Int32) error { |
| 101 | var ( |
| 102 | discoveredBlobCount atomic.Int32 |
| 103 | discoveringBlobCount atomic.Int32 |
| 104 | tt timetrack.Throttle |
| 105 | ) |
| 106 | |
| 107 | // recover indexes from all pack blobs in parallel. |
| 108 | // this is actually quite fast since we typically need to read only 8KB from each blob. |
| 109 | eg, ctx := errgroup.WithContext(ctx) |
| 110 | |
| 111 | go func() { |
| 112 | for _, prefix := range prefixes { |
| 113 | //nolint:errcheck |
| 114 | rep.BlobStorage().ListBlobs(ctx, prefix, func(_ blob.Metadata) error { |
| 115 | discoveringBlobCount.Add(1) |
| 116 | return nil |
| 117 | }) |
| 118 | } |
| 119 | |
| 120 | discoveredBlobCount.Store(discoveringBlobCount.Load()) |
| 121 | }() |
| 122 | |
| 123 | est := timetrack.Start() |
| 124 | |
| 125 | blobCh := make(chan blob.Metadata) |
| 126 | |
| 127 | // goroutine to populate blob metadata into a channel. |
| 128 | eg.Go(func() error { |
| 129 | defer close(blobCh) |
| 130 | |
| 131 | for _, prefix := range prefixes { |
| 132 | if err := rep.BlobStorage().ListBlobs(ctx, prefix, func(bm blob.Metadata) error { |
| 133 | blobCh <- bm |
| 134 | return nil |
| 135 | }); err != nil { |
| 136 | return errors.Wrapf(err, "error listing blobs with prefix %q", prefix) |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | return nil |
| 141 | }) |
| 142 | |
| 143 | // N goroutines to recover from incoming blobs. |
| 144 | for worker := range c.parallel { |
| 145 | eg.Go(func() error { |
| 146 | cnt := 0 |
| 147 | |
| 148 | for bm := range blobCh { |
| 149 | finishedBlobs := processedBlobCount.Load() |
| 150 | |
| 151 | log(ctx).Debugf("worker %v got %v", worker, cnt) |
| 152 | |
| 153 | cnt++ |
| 154 | |
| 155 | if tt.ShouldOutput(time.Second) { |
| 156 | if disc := discoveredBlobCount.Load(); disc > 0 { |
| 157 | e, ok := est.Estimate(float64(finishedBlobs), float64(disc)) |
no test coverage detected