MCPcopy
hub / github.com/zenml-io/zenml / initialize_workload_manager

Function initialize_workload_manager

src/zenml/zen_server/utils.py:201–220  ·  view source on GitHub ↗

Initialize the workload manager component. This does not fail if the source can't be loaded but only logs a warning.

()

Source from the content-addressed store, hash-verified

199
200
201def initialize_workload_manager() -> None:
202 """Initialize the workload manager component.
203
204 This does not fail if the source can't be loaded but only logs a warning.
205 """
206 global _workload_manager
207
208 if source := server_config().workload_manager_implementation_source:
209 from zenml.utils import source_utils
210
211 try:
212 workload_manager_class: Type[WorkloadManagerInterface] = (
213 source_utils.load_and_validate_class(
214 source=source, expected_class=WorkloadManagerInterface
215 )
216 )
217 except (ModuleNotFoundError, KeyError):
218 logger.warning("Unable to load workload manager source.")
219 else:
220 _workload_manager = workload_manager_class()
221
222
223def stream_broker() -> StreamBroker:

Callers 1

initializeFunction · 0.90

Calls 1

server_configFunction · 0.85

Tested by

no test coverage detected