(ctx context.Context, n *node.Node, app *Application)
| 162 | } |
| 163 | |
| 164 | func syncState(ctx context.Context, n *node.Node, app *Application) error { |
| 165 | xlog.Debug("[p2p-sync] Syncing state") |
| 166 | |
| 167 | whatWeHave := []string{} |
| 168 | for _, model := range app.ModelConfigLoader().GetAllModelsConfigs() { |
| 169 | whatWeHave = append(whatWeHave, model.Name) |
| 170 | } |
| 171 | |
| 172 | ledger, _ := n.Ledger() |
| 173 | currentData := ledger.CurrentData() |
| 174 | xlog.Debug("[p2p-sync] Current data", "data", currentData) |
| 175 | data, exists := ledger.GetKey("shared_state", "models") |
| 176 | if !exists { |
| 177 | ledger.AnnounceUpdate(ctx, time.Minute, "shared_state", "models", whatWeHave) |
| 178 | xlog.Debug("No models found in the ledger, announced our models", "models", whatWeHave) |
| 179 | } |
| 180 | |
| 181 | models := []string{} |
| 182 | if err := data.Unmarshal(&models); err != nil { |
| 183 | xlog.Warn("error unmarshalling models", "error", err) |
| 184 | return nil |
| 185 | } |
| 186 | |
| 187 | xlog.Debug("[p2p-sync] Models comparison", "ourModels", whatWeHave, "ledgerModels", models) |
| 188 | |
| 189 | // Sync with our state |
| 190 | whatIsNotThere := []string{} |
| 191 | for _, model := range whatWeHave { |
| 192 | if !slices.Contains(models, model) { |
| 193 | whatIsNotThere = append(whatIsNotThere, model) |
| 194 | } |
| 195 | } |
| 196 | if len(whatIsNotThere) > 0 { |
| 197 | xlog.Debug("[p2p-sync] Announcing our models", "models", append(models, whatIsNotThere...)) |
| 198 | ledger.AnnounceUpdate( |
| 199 | ctx, |
| 200 | 1*time.Minute, |
| 201 | "shared_state", |
| 202 | "models", |
| 203 | append(models, whatIsNotThere...), |
| 204 | ) |
| 205 | } |
| 206 | |
| 207 | // Check if we have a model that is not in our state, otherwise install it |
| 208 | for _, model := range models { |
| 209 | if slices.Contains(whatWeHave, model) { |
| 210 | xlog.Debug("[p2p-sync] Model is already present in this instance", "model", model) |
| 211 | continue |
| 212 | } |
| 213 | |
| 214 | // we install model |
| 215 | xlog.Info("[p2p-sync] Installing model which is not present in this instance", "model", model) |
| 216 | |
| 217 | uuid, err := uuid.NewUUID() |
| 218 | if err != nil { |
| 219 | xlog.Error("error generating UUID", "error", err) |
| 220 | continue |
| 221 | } |
no test coverage detected