Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, make sure the queue is drained.
()
| 910 | // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, |
| 911 | // make sure the queue is drained. |
| 912 | func (mgtr *Migrator) waitForEventsUpToLock() error { |
| 913 | timeout := time.NewTimer(time.Second * time.Duration(mgtr.migrationContext.CutOverLockTimeoutSeconds)) |
| 914 | |
| 915 | mgtr.migrationContext.MarkPointOfInterest() |
| 916 | waitForEventsUpToLockStartTime := time.Now() |
| 917 | |
| 918 | allEventsUpToLockProcessedChallenge := fmt.Sprintf("%s:%d", string(AllEventsUpToLockProcessed), waitForEventsUpToLockStartTime.UnixNano()) |
| 919 | mgtr.migrationContext.Log.Infof("Writing changelog state: %+v", allEventsUpToLockProcessedChallenge) |
| 920 | if _, err := mgtr.applier.WriteChangelogState(allEventsUpToLockProcessedChallenge); err != nil { |
| 921 | return err |
| 922 | } |
| 923 | mgtr.migrationContext.Log.Infof("Waiting for events up to lock") |
| 924 | atomic.StoreInt64(&mgtr.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1) |
| 925 | var lockProcessed *lockProcessedStruct |
| 926 | for found := false; !found; { |
| 927 | select { |
| 928 | case <-timeout.C: |
| 929 | { |
| 930 | return mgtr.migrationContext.Log.Errorf("timeout while waiting for events up to lock") |
| 931 | } |
| 932 | case lockProcessed = <-mgtr.allEventsUpToLockProcessed: |
| 933 | { |
| 934 | if lockProcessed.state == allEventsUpToLockProcessedChallenge { |
| 935 | mgtr.migrationContext.Log.Infof("Waiting for events up to lock: got %s", lockProcessed.state) |
| 936 | found = true |
| 937 | mgtr.lastLockProcessed = lockProcessed |
| 938 | } else { |
| 939 | mgtr.migrationContext.Log.Infof("Waiting for events up to lock: skipping %s", lockProcessed.state) |
| 940 | } |
| 941 | } |
| 942 | } |
| 943 | } |
| 944 | waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime) |
| 945 | |
| 946 | mgtr.migrationContext.Log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration) |
| 947 | mgtr.printStatus(ForcePrintStatusAndHintRule) |
| 948 | |
| 949 | return nil |
| 950 | } |
| 951 | |
| 952 | // cutOverTwoStep will lock down the original table, execute |
| 953 | // what's left of last DML entries, and **non-atomically** swap original->old, then new->original. |
no test coverage detected