Load Celery tasks from a plugin.
(plugin_name: str)
| 36 | |
| 37 | |
| 38 | def load_plugin_tasks(plugin_name: str): |
| 39 | """Load Celery tasks from a plugin.""" |
| 40 | plugin = get_plugin(plugin_name) |
| 41 | if plugin and hasattr(plugin, "get_tasks"): |
| 42 | return plugin.get_tasks() |
| 43 | return None |
| 44 | |
| 45 | |
| 46 | def get_all_plugin_tasks(): |
no test coverage detected