(maxScaledThreads int, scale chan *frankenPHPContext, done chan struct{}, mstate *state.ThreadState)
| 144 | } |
| 145 | |
| 146 | func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext, done chan struct{}, mstate *state.ThreadState) { |
| 147 | for { |
| 148 | scalingMu.Lock() |
| 149 | scaledThreadCount := len(autoScaledThreads) |
| 150 | scalingMu.Unlock() |
| 151 | if scaledThreadCount >= maxScaledThreads { |
| 152 | // we have reached max_threads, check again later |
| 153 | select { |
| 154 | case <-done: |
| 155 | return |
| 156 | case <-time.After(downScaleCheckTime): |
| 157 | continue |
| 158 | } |
| 159 | } |
| 160 | |
| 161 | select { |
| 162 | case fc := <-scale: |
| 163 | timeSinceStalled := time.Since(fc.startedAt) |
| 164 | |
| 165 | // if the request has not been stalled long enough, wait and repeat |
| 166 | if timeSinceStalled < minStallTime { |
| 167 | select { |
| 168 | case <-done: |
| 169 | return |
| 170 | case <-time.After(minStallTime - timeSinceStalled): |
| 171 | continue |
| 172 | } |
| 173 | } |
| 174 | |
| 175 | // if the request has been stalled long enough, scale |
| 176 | if fc.worker == nil { |
| 177 | scaleRegularThread(done, mstate) |
| 178 | continue |
| 179 | } |
| 180 | |
| 181 | // check for max worker threads here again in case requests overflowed while waiting |
| 182 | if fc.worker.isAtThreadLimit() { |
| 183 | if globalLogger.Enabled(globalCtx, slog.LevelInfo) { |
| 184 | globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "cannot scale worker thread, max threads reached for worker", slog.String("worker", fc.worker.name)) |
| 185 | } |
| 186 | |
| 187 | continue |
| 188 | } |
| 189 | |
| 190 | scaleWorkerThread(fc.worker, done, mstate) |
| 191 | case <-done: |
| 192 | return |
| 193 | } |
| 194 | } |
| 195 | } |
| 196 | |
| 197 | func startDownScalingThreads(done chan struct{}) { |
| 198 | for { |
no test coverage detected