MCPcopy
hub / github.com/benbjohnson/litestream / runOnce

Method runOnce

cmd/litestream/replicate.go:394–433  ·  view source on GitHub ↗

runOnce performs one-shot replication for all databases. It syncs all databases, optionally takes snapshots, and enforces retention.

(ctx context.Context)

Source from the content-addressed store, hash-verified

392// runOnce performs one-shot replication for all databases.
393// It syncs all databases, optionally takes snapshots, and enforces retention.
394func (c *ReplicateCommand) runOnce(ctx context.Context) {
395 var err error
396 defer func() { c.execCh <- err }()
397
398 for _, db := range c.Store.DBs() {
399 slog.Info("syncing database", "path", db.Path())
400
401 // Sync the database to process any pending WAL changes.
402 if err = db.Sync(ctx); err != nil {
403 err = fmt.Errorf("sync database %s: %w", db.Path(), err)
404 return
405 }
406
407 // Sync the replica to upload any pending LTX files.
408 if err = db.Replica.Sync(ctx); err != nil {
409 err = fmt.Errorf("sync replica for %s: %w", db.Path(), err)
410 return
411 }
412
413 // Force a snapshot if requested.
414 if c.forceSnapshot {
415 slog.Info("taking snapshot", "path", db.Path())
416 if _, err = db.Snapshot(ctx); err != nil {
417 err = fmt.Errorf("snapshot %s: %w", db.Path(), err)
418 return
419 }
420 }
421
422 // Enforce retention if requested.
423 if c.enforceRetention {
424 slog.Info("enforcing retention", "path", db.Path())
425 if err = c.Store.EnforceSnapshotRetention(ctx, db); err != nil {
426 err = fmt.Errorf("enforce retention for %s: %w", db.Path(), err)
427 return
428 }
429 }
430 }
431
432 slog.Info("one-shot replication complete")
433}
434
435// Close closes all open databases.
436func (c *ReplicateCommand) Close(ctx context.Context) error {

Callers 1

RunMethod · 0.95

Calls 5

DBsMethod · 0.80
PathMethod · 0.45
SyncMethod · 0.45
SnapshotMethod · 0.45

Tested by

no test coverage detected