initiateStreaming begins streaming of binary log events and registers listeners for such events
()
| 1441 | |
| 1442 | // initiateStreaming begins streaming of binary log events and registers listeners for such events |
| 1443 | func (mgtr *Migrator) initiateStreaming() error { |
| 1444 | mgtr.eventsStreamer = NewEventsStreamer(mgtr.migrationContext) |
| 1445 | if err := mgtr.eventsStreamer.InitDBConnections(); err != nil { |
| 1446 | return err |
| 1447 | } |
| 1448 | mgtr.eventsStreamer.AddListener( |
| 1449 | false, |
| 1450 | mgtr.migrationContext.DatabaseName, |
| 1451 | mgtr.migrationContext.GetChangelogTableName(), |
| 1452 | func(dmlEntry *binlog.BinlogEntry) error { |
| 1453 | return mgtr.onChangelogEvent(dmlEntry) |
| 1454 | }, |
| 1455 | ) |
| 1456 | |
| 1457 | go func() { |
| 1458 | mgtr.migrationContext.Log.Debugf("Beginning streaming") |
| 1459 | err := mgtr.eventsStreamer.StreamEvents(mgtr.canStopStreaming) |
| 1460 | if err != nil { |
| 1461 | // Use helper to prevent deadlock if listenOnPanicAbort already exited |
| 1462 | _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) |
| 1463 | } |
| 1464 | mgtr.migrationContext.Log.Debugf("Done streaming") |
| 1465 | }() |
| 1466 | |
| 1467 | go func() { |
| 1468 | ticker := time.NewTicker(time.Second) |
| 1469 | defer ticker.Stop() |
| 1470 | for range ticker.C { |
| 1471 | if atomic.LoadInt64(&mgtr.finishedMigrating) > 0 { |
| 1472 | return |
| 1473 | } |
| 1474 | mgtr.migrationContext.SetRecentBinlogCoordinates(mgtr.eventsStreamer.GetCurrentBinlogCoordinates()) |
| 1475 | } |
| 1476 | }() |
| 1477 | return nil |
| 1478 | } |
| 1479 | |
| 1480 | // addDMLEventsListener begins listening for binlog events on the original table, |
| 1481 | // and creates & enqueues a write task per such event. |
no test coverage detected