Initialize the workload manager component. This does not fail if the source can't be loaded but only logs a warning.
()
| 199 | |
| 200 | |
| 201 | def initialize_workload_manager() -> None: |
| 202 | """Initialize the workload manager component. |
| 203 | |
| 204 | This does not fail if the source can't be loaded but only logs a warning. |
| 205 | """ |
| 206 | global _workload_manager |
| 207 | |
| 208 | if source := server_config().workload_manager_implementation_source: |
| 209 | from zenml.utils import source_utils |
| 210 | |
| 211 | try: |
| 212 | workload_manager_class: Type[WorkloadManagerInterface] = ( |
| 213 | source_utils.load_and_validate_class( |
| 214 | source=source, expected_class=WorkloadManagerInterface |
| 215 | ) |
| 216 | ) |
| 217 | except (ModuleNotFoundError, KeyError): |
| 218 | logger.warning("Unable to load workload manager source.") |
| 219 | else: |
| 220 | _workload_manager = workload_manager_class() |
| 221 | |
| 222 | |
| 223 | def stream_broker() -> StreamBroker: |
no test coverage detected