()
| 1100 | } |
| 1101 | |
| 1102 | func (t *QueueManager) updateShardsLoop() { |
| 1103 | defer t.wg.Done() |
| 1104 | |
| 1105 | ticker := time.NewTicker(shardUpdateDuration) |
| 1106 | defer ticker.Stop() |
| 1107 | for { |
| 1108 | select { |
| 1109 | case <-ticker.C: |
| 1110 | desiredShards := t.calculateDesiredShards() |
| 1111 | if !t.shouldReshard(desiredShards) { |
| 1112 | continue |
| 1113 | } |
| 1114 | // Resharding can take some time, and we want this loop |
| 1115 | // to stay close to shardUpdateDuration. |
| 1116 | select { |
| 1117 | case t.reshardChan <- desiredShards: |
| 1118 | t.logger.Info("Remote storage resharding", "from", t.numShards, "to", desiredShards) |
| 1119 | t.numShards = desiredShards |
| 1120 | default: |
| 1121 | t.logger.Info("Currently resharding, skipping.") |
| 1122 | } |
| 1123 | case <-t.quit: |
| 1124 | return |
| 1125 | } |
| 1126 | } |
| 1127 | } |
| 1128 | |
| 1129 | // shouldReshard returns whether resharding should occur. |
| 1130 | func (t *QueueManager) shouldReshard(desiredShards int) bool { |
no test coverage detected