executeWriteFuncs writes data via applier: both the rowcopy and the events backlog. This is where the ghost table gets the data. The function fills the data single-threaded. Both event backlog and rowcopy events are polled; the backlog events have precedence.
()
| 1822 | // This is where the ghost table gets the data. The function fills the data single-threaded. |
| 1823 | // Both event backlog and rowcopy events are polled; the backlog events have precedence. |
| 1824 | func (mgtr *Migrator) executeWriteFuncs() error { |
| 1825 | if mgtr.migrationContext.Noop { |
| 1826 | mgtr.migrationContext.Log.Debugf("Noop operation; not really executing write funcs") |
| 1827 | return nil |
| 1828 | } |
| 1829 | for { |
| 1830 | if err := mgtr.checkAbort(); err != nil { |
| 1831 | return err |
| 1832 | } |
| 1833 | if atomic.LoadInt64(&mgtr.finishedMigrating) > 0 { |
| 1834 | return nil |
| 1835 | } |
| 1836 | |
| 1837 | mgtr.throttler.throttle(nil) |
| 1838 | |
| 1839 | // We give higher priority to event processing, then secondary priority to |
| 1840 | // rowcopy |
| 1841 | select { |
| 1842 | case eventStruct := <-mgtr.applyEventsQueue: |
| 1843 | { |
| 1844 | if err := mgtr.onApplyEventStruct(eventStruct); err != nil { |
| 1845 | return err |
| 1846 | } |
| 1847 | } |
| 1848 | default: |
| 1849 | { |
| 1850 | select { |
| 1851 | case copyRowsFunc := <-mgtr.copyRowsQueue: |
| 1852 | { |
| 1853 | copyRowsStartTime := time.Now() |
| 1854 | // Retries are handled within the copyRowsFunc |
| 1855 | if err := copyRowsFunc(); err != nil { |
| 1856 | return mgtr.migrationContext.Log.Errore(err) |
| 1857 | } |
| 1858 | if niceRatio := mgtr.migrationContext.GetNiceRatio(); niceRatio > 0 { |
| 1859 | copyRowsDuration := time.Since(copyRowsStartTime) |
| 1860 | sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds()) |
| 1861 | sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond |
| 1862 | time.Sleep(sleepTime) |
| 1863 | } |
| 1864 | } |
| 1865 | default: |
| 1866 | { |
| 1867 | // Hmmmmm... nothing in the queue; no events, but also no row copy. |
| 1868 | // This is possible upon load. Let's just sleep it over. |
| 1869 | mgtr.migrationContext.Log.Debugf("Getting nothing in the write queue. Sleeping...") |
| 1870 | time.Sleep(time.Second) |
| 1871 | } |
| 1872 | } |
| 1873 | } |
| 1874 | } |
| 1875 | } |
| 1876 | } |
| 1877 | |
| 1878 | func (mgtr *Migrator) executeDMLWriteFuncs() error { |
| 1879 | if mgtr.migrationContext.Noop { |
no test coverage detected