runOnce performs one-shot replication for all databases. It syncs all databases, optionally takes snapshots, and enforces retention.
(ctx context.Context)
| 392 | // runOnce performs one-shot replication for all databases. |
| 393 | // It syncs all databases, optionally takes snapshots, and enforces retention. |
| 394 | func (c *ReplicateCommand) runOnce(ctx context.Context) { |
| 395 | var err error |
| 396 | defer func() { c.execCh <- err }() |
| 397 | |
| 398 | for _, db := range c.Store.DBs() { |
| 399 | slog.Info("syncing database", "path", db.Path()) |
| 400 | |
| 401 | // Sync the database to process any pending WAL changes. |
| 402 | if err = db.Sync(ctx); err != nil { |
| 403 | err = fmt.Errorf("sync database %s: %w", db.Path(), err) |
| 404 | return |
| 405 | } |
| 406 | |
| 407 | // Sync the replica to upload any pending LTX files. |
| 408 | if err = db.Replica.Sync(ctx); err != nil { |
| 409 | err = fmt.Errorf("sync replica for %s: %w", db.Path(), err) |
| 410 | return |
| 411 | } |
| 412 | |
| 413 | // Force a snapshot if requested. |
| 414 | if c.forceSnapshot { |
| 415 | slog.Info("taking snapshot", "path", db.Path()) |
| 416 | if _, err = db.Snapshot(ctx); err != nil { |
| 417 | err = fmt.Errorf("snapshot %s: %w", db.Path(), err) |
| 418 | return |
| 419 | } |
| 420 | } |
| 421 | |
| 422 | // Enforce retention if requested. |
| 423 | if c.enforceRetention { |
| 424 | slog.Info("enforcing retention", "path", db.Path()) |
| 425 | if err = c.Store.EnforceSnapshotRetention(ctx, db); err != nil { |
| 426 | err = fmt.Errorf("enforce retention for %s: %w", db.Path(), err) |
| 427 | return |
| 428 | } |
| 429 | } |
| 430 | } |
| 431 | |
| 432 | slog.Info("one-shot replication complete") |
| 433 | } |
| 434 | |
| 435 | // Close closes all open databases. |
| 436 | func (c *ReplicateCommand) Close(ctx context.Context) error { |