Get Celery configuration setting with 4-tier hierarchy resolution. Resolution order (most specific wins): 1. Command-line arguments (--pool=prefork, --concurrency=20, etc.) - HIGHEST 2. {WORKER_TYPE}_{SETTING_NAME} (worker-specific env override) 3. CELERY_{SETTING_NAME} (global env
(
setting_name: str, worker_type: WorkerType, default: Any, setting_type: type = str
)
| 79 | |
| 80 | |
| 81 | def get_celery_setting( |
| 82 | setting_name: str, worker_type: WorkerType, default: Any, setting_type: type = str |
| 83 | ) -> Any: |
| 84 | """Get Celery configuration setting with 4-tier hierarchy resolution. |
| 85 | |
| 86 | Resolution order (most specific wins): |
| 87 | 1. Command-line arguments (--pool=prefork, --concurrency=20, etc.) - HIGHEST |
| 88 | 2. {WORKER_TYPE}_{SETTING_NAME} (worker-specific env override) |
| 89 | 3. CELERY_{SETTING_NAME} (global env override) |
| 90 | 4. default (Celery standard/provided default) - LOWEST |
| 91 | |
| 92 | Args: |
| 93 | setting_name: The setting name (e.g., "TASK_TIME_LIMIT") |
| 94 | worker_type: Worker type for worker-specific overrides |
| 95 | default: Default value if no settings are found |
| 96 | setting_type: Type to convert the setting value to (str, int, bool, etc.) |
| 97 | |
| 98 | Returns: |
| 99 | Resolved configuration value with proper type conversion |
| 100 | |
| 101 | Examples: |
| 102 | # Check --pool=, then FILE_PROCESSING_POOL_TYPE, then CELERY_POOL_TYPE, then default |
| 103 | pool = get_celery_setting("POOL_TYPE", WorkerType.FILE_PROCESSING, "prefork", str) |
| 104 | |
| 105 | # Check --concurrency=, then CALLBACK_CONCURRENCY, then CELERY_CONCURRENCY, then default |
| 106 | concurrency = get_celery_setting("CONCURRENCY", WorkerType.CALLBACK, 4, int) |
| 107 | """ |
| 108 | # 1. Check command-line arguments first (highest priority) |
| 109 | cmdline_settings = _get_cmdline_celery_settings() |
| 110 | if setting_name in cmdline_settings: |
| 111 | cmdline_value = cmdline_settings[setting_name] |
| 112 | converted_value = _convert_setting_value(cmdline_value, setting_type) |
| 113 | if converted_value is not None: |
| 114 | return converted_value |
| 115 | |
| 116 | # 2. Check worker-specific setting (high priority) |
| 117 | worker_specific_key = f"{worker_type.name}_{setting_name}" |
| 118 | worker_value = os.getenv(worker_specific_key) |
| 119 | if worker_value is not None: |
| 120 | converted_value = _convert_setting_value(worker_value, setting_type) |
| 121 | if converted_value is not None: |
| 122 | return converted_value |
| 123 | |
| 124 | # 3. Check global Celery setting (medium priority) |
| 125 | global_key = f"CELERY_{setting_name}" |
| 126 | global_value = os.getenv(global_key) |
| 127 | if global_value is not None: |
| 128 | converted_value = _convert_setting_value(global_value, setting_type) |
| 129 | if converted_value is not None: |
| 130 | return converted_value |
| 131 | |
| 132 | # 4. Use provided default (lowest priority) |
| 133 | return default |
| 134 | |
| 135 | |
| 136 | def _convert_setting_value(value: str, setting_type: type) -> Any: |
no test coverage detected