consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then consumes and drops any further incoming events that may be left hanging.
()
| 234 | // consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then |
| 235 | // consumes and drops any further incoming events that may be left hanging. |
| 236 | func (mgtr *Migrator) consumeRowCopyComplete() { |
| 237 | select { |
| 238 | case err := <-mgtr.rowCopyComplete: |
| 239 | if err != nil { |
| 240 | // Abort synchronously to ensure checkAbort() sees the error immediately |
| 241 | mgtr.abort(err) |
| 242 | // Don't mark row copy as complete if there was an error |
| 243 | return |
| 244 | } |
| 245 | case <-mgtr.migrationContext.GetContext().Done(): |
| 246 | // Abort cancelled the context |
| 247 | return |
| 248 | } |
| 249 | atomic.StoreInt64(&mgtr.rowCopyCompleteFlag, 1) |
| 250 | mgtr.migrationContext.MarkRowCopyEndTime() |
| 251 | go func() { |
| 252 | for err := range mgtr.rowCopyComplete { |
| 253 | if err != nil { |
| 254 | // Abort synchronously to ensure the error is stored immediately |
| 255 | mgtr.abort(err) |
| 256 | return |
| 257 | } |
| 258 | } |
| 259 | }() |
| 260 | } |
| 261 | |
| 262 | func (mgtr *Migrator) canStopStreaming() bool { |
| 263 | return atomic.LoadInt64(&mgtr.migrationContext.CutOverCompleteFlag) != 0 |