()
| 70 | } |
| 71 | |
| 72 | func (s *Service) triggerCompaction() { |
| 73 | var wg sync.WaitGroup |
| 74 | |
| 75 | // NumCPU returns the number of logical CPUs usable by the current process. |
| 76 | // |
| 77 | // The set of available CPUs is checked by querying the operating system |
| 78 | // at process startup. Changes to operating system CPU allocation after |
| 79 | // process startup are not reflected. |
| 80 | numWorkers := runtime.NumCPU() |
| 81 | sem := semaphore.NewWeighted(int64(numWorkers)) |
| 82 | for partID := uint64(0); partID < s.config.PartitionCount; partID++ { |
| 83 | select { |
| 84 | case <-s.ctx.Done(): |
| 85 | break |
| 86 | default: |
| 87 | } |
| 88 | |
| 89 | if err := sem.Acquire(s.ctx, 1); err != nil { |
| 90 | if err != context.Canceled { |
| 91 | s.log.V(3).Printf("[ERROR] Failed to acquire semaphore for DMap compaction: %v", err) |
| 92 | } |
| 93 | continue |
| 94 | } |
| 95 | |
| 96 | wg.Add(1) |
| 97 | go func(id uint64) { |
| 98 | defer wg.Done() |
| 99 | defer sem.Release(1) |
| 100 | s.doCompaction(id) |
| 101 | }(partID) |
| 102 | } |
| 103 | |
| 104 | wg.Wait() |
| 105 | } |
| 106 | |
| 107 | func (s *Service) compactionWorker() { |
| 108 | defer s.wg.Done() |
no test coverage detected