Return app-level max threads from config with support for 'auto' value. Args: config: Configuration dictionary Returns: Maximum number of threads for parallel operations
(config: dict)
| 287 | |
| 288 | |
| 289 | def get_app_max_threads(config: dict) -> int: |
| 290 | """Return app-level max threads from config with support for 'auto' value. |
| 291 | |
| 292 | Args: |
| 293 | config: Configuration dictionary |
| 294 | |
| 295 | Returns: |
| 296 | Maximum number of threads for parallel operations |
| 297 | """ |
| 298 | default_max_threads = calculate_default_max_threads() |
| 299 | config_path = ["app", "max_threads"] |
| 300 | |
| 301 | if not traverse_config_path(config=config, config_path=config_path): |
| 302 | log_config_debug( |
| 303 | f"max_threads is not found in {config_path_to_string(config_path=config_path)}. " |
| 304 | f"Using default max_threads: {default_max_threads} (auto) ...", |
| 305 | ) |
| 306 | return default_max_threads |
| 307 | |
| 308 | max_threads_config = get_config_value(config=config, config_path=config_path) |
| 309 | return parse_max_threads_value(max_threads_config, default_max_threads) |
| 310 | |
| 311 | |
| 312 | def get_usage_tracking_enabled(config: dict) -> bool: |
nothing calls this directly
no test coverage detected