MCPcopy Index your code
hub / github.com/mudler/LocalAI / initDistributed

Function initDistributed

core/application/distributed.go:78–409  ·  view source on GitHub ↗

initDistributed validates distributed mode prerequisites and initializes NATS, object storage, node registry, and instance identity. Returns nil if distributed mode is not enabled. configLoader is used by the SmartRouter to compute concurrency-group anti-affinity at placement time (#9659); it may be

(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoader *config.ModelConfigLoader)

Source from the content-addressed store, hash-verified

76// configLoader is used by the SmartRouter to compute concurrency-group
77// anti-affinity at placement time (#9659); it may be nil in tests.
78func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoader *config.ModelConfigLoader) (*DistributedServices, error) {
79 if !cfg.Distributed.Enabled {
80 return nil, nil
81 }
82
83 xlog.Info("Distributed mode enabled — validating prerequisites")
84
85 // Validate distributed config (NATS URL, S3 credential pairing, durations, etc.)
86 if err := cfg.Distributed.Validate(); err != nil {
87 return nil, err
88 }
89
90 // Validate PostgreSQL is configured (auth DB must be PostgreSQL for distributed mode)
91 if !cfg.Auth.Enabled {
92 return nil, fmt.Errorf("distributed mode requires authentication to be enabled (--auth / LOCALAI_AUTH=true)")
93 }
94 if !isPostgresURL(cfg.Auth.DatabaseURL) {
95 return nil, fmt.Errorf("distributed mode requires PostgreSQL for auth database (got %q)", sanitize.URL(cfg.Auth.DatabaseURL))
96 }
97
98 // Generate instance ID if not set
99 if cfg.Distributed.InstanceID == "" {
100 cfg.Distributed.InstanceID = uuid.New().String()
101 }
102 xlog.Info("Distributed instance", "id", cfg.Distributed.InstanceID)
103
104 // Connect to NATS
105 natsAuth := cfg.Distributed.NatsAuthConfig()
106 if natsAuth.RequireAuth && (natsAuth.ServiceUserJWT == "" || natsAuth.ServiceUserSeed == "") {
107 return nil, fmt.Errorf("LOCALAI_NATS_REQUIRE_AUTH requires LOCALAI_NATS_SERVICE_JWT and LOCALAI_NATS_SERVICE_SEED")
108 }
109 natsOpts := cfg.Distributed.NatsMessagingOptions("", "")
110 natsClient, err := messaging.New(cfg.Distributed.NatsURL, natsOpts...)
111 if err != nil {
112 return nil, fmt.Errorf("connecting to NATS: %w", err)
113 }
114 xlog.Info("Connected to NATS", "url", sanitize.URL(cfg.Distributed.NatsURL))
115
116 // Ensure NATS is closed if any subsequent initialization step fails.
117 success := false
118 defer func() {
119 if !success {
120 natsClient.Close()
121 }
122 }()
123
124 // Initialize object storage
125 var store storage.ObjectStore
126 if cfg.Distributed.StorageURL != "" {
127 if cfg.Distributed.StorageBucket == "" {
128 return nil, fmt.Errorf("distributed storage bucket must be set when storage URL is configured")
129 }
130 s3Store, err := storage.NewS3Store(context.Background(), storage.S3Config{
131 Endpoint: cfg.Distributed.StorageURL,
132 Region: cfg.Distributed.StorageRegion,
133 Bucket: cfg.Distributed.StorageBucket,
134 AccessKeyID: cfg.Distributed.StorageAccessKey,
135 SecretAccessKey: cfg.Distributed.StorageSecretKey,

Callers 1

NewFunction · 0.85

Calls 15

SeedModelSchedulingMethod · 0.95
GetMethod · 0.95
ValidateMethod · 0.95
SetReplicaRemovedHookMethod · 0.95
InvalidateNodeMethod · 0.95
InvalidateMethod · 0.95
ApplyObserveMethod · 0.95
ApplyInvalidateMethod · 0.95
EvictMethod · 0.95
StagingTrackerMethod · 0.95
URLFunction · 0.92

Tested by

no test coverage detected