deactivateThreads checks all threads and removes those that have been inactive for too long
()
| 207 | |
| 208 | // deactivateThreads checks all threads and removes those that have been inactive for too long |
| 209 | func deactivateThreads() { |
| 210 | stoppedThreadCount := 0 |
| 211 | scalingMu.Lock() |
| 212 | defer scalingMu.Unlock() |
| 213 | for i := len(autoScaledThreads) - 1; i >= 0; i-- { |
| 214 | thread := autoScaledThreads[i] |
| 215 | |
| 216 | // the thread might have been stopped otherwise, remove it |
| 217 | if thread.state.Is(state.Reserved) { |
| 218 | autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) |
| 219 | continue |
| 220 | } |
| 221 | |
| 222 | waitTime := thread.state.WaitTime() |
| 223 | if stoppedThreadCount > maxTerminationCount || waitTime == 0 { |
| 224 | continue |
| 225 | } |
| 226 | |
| 227 | // convert threads to inactive if they have been idle for too long |
| 228 | if thread.state.Is(state.Ready) && waitTime > maxIdleTime.Milliseconds() { |
| 229 | convertToInactiveThread(thread) |
| 230 | stoppedThreadCount++ |
| 231 | autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) |
| 232 | |
| 233 | if globalLogger.Enabled(globalCtx, slog.LevelInfo) { |
| 234 | globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "downscaling thread", slog.Int("thread", thread.threadIndex), slog.Int64("wait_time", waitTime), slog.Int("num_threads", len(autoScaledThreads))) |
| 235 | } |
| 236 | |
| 237 | continue |
| 238 | } |
| 239 | |
| 240 | // TODO: Completely stopping threads is more memory efficient |
| 241 | // Some PECL extensions like #1296 will prevent threads from fully stopping (they leak memory) |
| 242 | // Reactivate this if there is a better solution or workaround |
| 243 | // if thread.state.Is(state.Inactive) && waitTime > maxThreadIdleTime.Milliseconds() { |
| 244 | // logger.LogAttrs(nil, slog.LevelDebug, "auto-stopping thread", slog.Int("thread", thread.threadIndex)) |
| 245 | // thread.shutdown() |
| 246 | // stoppedThreadCount++ |
| 247 | // autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) |
| 248 | // continue |
| 249 | // } |
| 250 | } |
| 251 | } |