initDistributed validates distributed mode prerequisites and initializes NATS, object storage, node registry, and instance identity. Returns nil if distributed mode is not enabled. configLoader is used by the SmartRouter to compute concurrency-group anti-affinity at placement time (#9659); it may be
(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoader *config.ModelConfigLoader)
| 76 | // configLoader is used by the SmartRouter to compute concurrency-group |
| 77 | // anti-affinity at placement time (#9659); it may be nil in tests. |
| 78 | func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoader *config.ModelConfigLoader) (*DistributedServices, error) { |
| 79 | if !cfg.Distributed.Enabled { |
| 80 | return nil, nil |
| 81 | } |
| 82 | |
| 83 | xlog.Info("Distributed mode enabled — validating prerequisites") |
| 84 | |
| 85 | // Validate distributed config (NATS URL, S3 credential pairing, durations, etc.) |
| 86 | if err := cfg.Distributed.Validate(); err != nil { |
| 87 | return nil, err |
| 88 | } |
| 89 | |
| 90 | // Validate PostgreSQL is configured (auth DB must be PostgreSQL for distributed mode) |
| 91 | if !cfg.Auth.Enabled { |
| 92 | return nil, fmt.Errorf("distributed mode requires authentication to be enabled (--auth / LOCALAI_AUTH=true)") |
| 93 | } |
| 94 | if !isPostgresURL(cfg.Auth.DatabaseURL) { |
| 95 | return nil, fmt.Errorf("distributed mode requires PostgreSQL for auth database (got %q)", sanitize.URL(cfg.Auth.DatabaseURL)) |
| 96 | } |
| 97 | |
| 98 | // Generate instance ID if not set |
| 99 | if cfg.Distributed.InstanceID == "" { |
| 100 | cfg.Distributed.InstanceID = uuid.New().String() |
| 101 | } |
| 102 | xlog.Info("Distributed instance", "id", cfg.Distributed.InstanceID) |
| 103 | |
| 104 | // Connect to NATS |
| 105 | natsAuth := cfg.Distributed.NatsAuthConfig() |
| 106 | if natsAuth.RequireAuth && (natsAuth.ServiceUserJWT == "" || natsAuth.ServiceUserSeed == "") { |
| 107 | return nil, fmt.Errorf("LOCALAI_NATS_REQUIRE_AUTH requires LOCALAI_NATS_SERVICE_JWT and LOCALAI_NATS_SERVICE_SEED") |
| 108 | } |
| 109 | natsOpts := cfg.Distributed.NatsMessagingOptions("", "") |
| 110 | natsClient, err := messaging.New(cfg.Distributed.NatsURL, natsOpts...) |
| 111 | if err != nil { |
| 112 | return nil, fmt.Errorf("connecting to NATS: %w", err) |
| 113 | } |
| 114 | xlog.Info("Connected to NATS", "url", sanitize.URL(cfg.Distributed.NatsURL)) |
| 115 | |
| 116 | // Ensure NATS is closed if any subsequent initialization step fails. |
| 117 | success := false |
| 118 | defer func() { |
| 119 | if !success { |
| 120 | natsClient.Close() |
| 121 | } |
| 122 | }() |
| 123 | |
| 124 | // Initialize object storage |
| 125 | var store storage.ObjectStore |
| 126 | if cfg.Distributed.StorageURL != "" { |
| 127 | if cfg.Distributed.StorageBucket == "" { |
| 128 | return nil, fmt.Errorf("distributed storage bucket must be set when storage URL is configured") |
| 129 | } |
| 130 | s3Store, err := storage.NewS3Store(context.Background(), storage.S3Config{ |
| 131 | Endpoint: cfg.Distributed.StorageURL, |
| 132 | Region: cfg.Distributed.StorageRegion, |
| 133 | Bucket: cfg.Distributed.StorageBucket, |
| 134 | AccessKeyID: cfg.Distributed.StorageAccessKey, |
| 135 | SecretAccessKey: cfg.Distributed.StorageSecretKey, |
no test coverage detected