MCPcopy
hub / github.com/github/gh-ost / waitForEventsUpToLock

Method waitForEventsUpToLock

go/logic/migrator.go:912–950  ·  view source on GitHub ↗

Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, make sure the queue is drained.

()

Source from the content-addressed store, hash-verified

910// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
911// make sure the queue is drained.
912func (mgtr *Migrator) waitForEventsUpToLock() error {
913 timeout := time.NewTimer(time.Second * time.Duration(mgtr.migrationContext.CutOverLockTimeoutSeconds))
914
915 mgtr.migrationContext.MarkPointOfInterest()
916 waitForEventsUpToLockStartTime := time.Now()
917
918 allEventsUpToLockProcessedChallenge := fmt.Sprintf("%s:%d", string(AllEventsUpToLockProcessed), waitForEventsUpToLockStartTime.UnixNano())
919 mgtr.migrationContext.Log.Infof("Writing changelog state: %+v", allEventsUpToLockProcessedChallenge)
920 if _, err := mgtr.applier.WriteChangelogState(allEventsUpToLockProcessedChallenge); err != nil {
921 return err
922 }
923 mgtr.migrationContext.Log.Infof("Waiting for events up to lock")
924 atomic.StoreInt64(&mgtr.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1)
925 var lockProcessed *lockProcessedStruct
926 for found := false; !found; {
927 select {
928 case <-timeout.C:
929 {
930 return mgtr.migrationContext.Log.Errorf("timeout while waiting for events up to lock")
931 }
932 case lockProcessed = <-mgtr.allEventsUpToLockProcessed:
933 {
934 if lockProcessed.state == allEventsUpToLockProcessedChallenge {
935 mgtr.migrationContext.Log.Infof("Waiting for events up to lock: got %s", lockProcessed.state)
936 found = true
937 mgtr.lastLockProcessed = lockProcessed
938 } else {
939 mgtr.migrationContext.Log.Infof("Waiting for events up to lock: skipping %s", lockProcessed.state)
940 }
941 }
942 }
943 }
944 waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)
945
946 mgtr.migrationContext.Log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)
947 mgtr.printStatus(ForcePrintStatusAndHintRule)
948
949 return nil
950}
951
952// cutOverTwoStep will lock down the original table, execute
953// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.

Callers 1

atomicCutOverMethod · 0.95

Calls 5

printStatusMethod · 0.95
MarkPointOfInterestMethod · 0.80
WriteChangelogStateMethod · 0.80
InfofMethod · 0.65
ErrorfMethod · 0.65

Tested by

no test coverage detected