Initialize the ZenML server.
()
| 192 | |
| 193 | @app.on_event("startup") |
| 194 | async def initialize() -> None: |
| 195 | """Initialize the ZenML server.""" |
| 196 | cfg = server_config() |
| 197 | # Set the maximum number of worker threads |
| 198 | to_thread.current_default_thread_limiter().total_tokens = ( |
| 199 | cfg.thread_pool_size |
| 200 | ) |
| 201 | # IMPORTANT: these need to be run before the fastapi app starts, to avoid |
| 202 | # race conditions |
| 203 | await initialize_request_manager() |
| 204 | initialize_zen_store() |
| 205 | # Instrument the SQL store with OpenTelemetry after it has been initialized. |
| 206 | instrument_sqlalchemy_store(store=zen_store()) |
| 207 | initialize_resource_pool_store() |
| 208 | service_connector_registry.register_builtin_service_connectors() |
| 209 | initialize_rbac() |
| 210 | initialize_feature_gate() |
| 211 | initialize_workload_manager() |
| 212 | initialize_resource_pool_store() |
| 213 | initialize_snapshot_executor() |
| 214 | await initialize_snapshot_run_dispatcher() |
| 215 | initialize_artifact_store_cache() |
| 216 | await initialize_streaming() |
| 217 | initialize_secure_headers() |
| 218 | if cfg.deployment_type == ServerDeploymentType.CLOUD: |
| 219 | # Send a workspace status update to the Cloud API to indicate that the |
| 220 | # ZenML server is running or to update the version and server URL. |
| 221 | send_pro_workspace_status_update() |
| 222 | |
| 223 | if logger.isEnabledFor(logging.DEBUG): |
| 224 | start_event_loop_lag_monitor() |
| 225 | |
| 226 | await register_event_handlers() |
| 227 | |
| 228 | |
| 229 | @app.on_event("shutdown") |
no test coverage detected