MCPcopy Index your code
hub / github.com/Zipstack/unstract / get_celery_setting

Function get_celery_setting

workers/shared/models/worker_models.py:81–133  ·  view source on GitHub ↗

Get Celery configuration setting with 4-tier hierarchy resolution. Resolution order (most specific wins): 1. Command-line arguments (--pool=prefork, --concurrency=20, etc.) - HIGHEST 2. {WORKER_TYPE}_{SETTING_NAME} (worker-specific env override) 3. CELERY_{SETTING_NAME} (global env

(
    setting_name: str, worker_type: WorkerType, default: Any, setting_type: type = str
)

Source from the content-addressed store, hash-verified

79
80
81def get_celery_setting(
82 setting_name: str, worker_type: WorkerType, default: Any, setting_type: type = str
83) -> Any:
84 """Get Celery configuration setting with 4-tier hierarchy resolution.
85
86 Resolution order (most specific wins):
87 1. Command-line arguments (--pool=prefork, --concurrency=20, etc.) - HIGHEST
88 2. {WORKER_TYPE}_{SETTING_NAME} (worker-specific env override)
89 3. CELERY_{SETTING_NAME} (global env override)
90 4. default (Celery standard/provided default) - LOWEST
91
92 Args:
93 setting_name: The setting name (e.g., "TASK_TIME_LIMIT")
94 worker_type: Worker type for worker-specific overrides
95 default: Default value if no settings are found
96 setting_type: Type to convert the setting value to (str, int, bool, etc.)
97
98 Returns:
99 Resolved configuration value with proper type conversion
100
101 Examples:
102 # Check --pool=, then FILE_PROCESSING_POOL_TYPE, then CELERY_POOL_TYPE, then default
103 pool = get_celery_setting("POOL_TYPE", WorkerType.FILE_PROCESSING, "prefork", str)
104
105 # Check --concurrency=, then CALLBACK_CONCURRENCY, then CELERY_CONCURRENCY, then default
106 concurrency = get_celery_setting("CONCURRENCY", WorkerType.CALLBACK, 4, int)
107 """
108 # 1. Check command-line arguments first (highest priority)
109 cmdline_settings = _get_cmdline_celery_settings()
110 if setting_name in cmdline_settings:
111 cmdline_value = cmdline_settings[setting_name]
112 converted_value = _convert_setting_value(cmdline_value, setting_type)
113 if converted_value is not None:
114 return converted_value
115
116 # 2. Check worker-specific setting (high priority)
117 worker_specific_key = f"{worker_type.name}_{setting_name}"
118 worker_value = os.getenv(worker_specific_key)
119 if worker_value is not None:
120 converted_value = _convert_setting_value(worker_value, setting_type)
121 if converted_value is not None:
122 return converted_value
123
124 # 3. Check global Celery setting (medium priority)
125 global_key = f"CELERY_{setting_name}"
126 global_value = os.getenv(global_key)
127 if global_value is not None:
128 converted_value = _convert_setting_value(global_value, setting_type)
129 if converted_value is not None:
130 return converted_value
131
132 # 4. Use provided default (lowest priority)
133 return default
134
135
136def _convert_setting_value(value: str, setting_type: type) -> Any:

Callers 6

startMethod · 0.90
worker.pyFile · 0.90
to_celery_dictMethod · 0.85

Calls 2

_convert_setting_valueFunction · 0.85

Tested by

no test coverage detected