MCPcopy
hub / github.com/github/gh-ost / initiateStreaming

Method initiateStreaming

go/logic/migrator.go:1443–1478  ·  view source on GitHub ↗

initiateStreaming begins streaming of binary log events and registers listeners for such events

()

Source from the content-addressed store, hash-verified

1441
1442// initiateStreaming begins streaming of binary log events and registers listeners for such events
1443func (mgtr *Migrator) initiateStreaming() error {
1444 mgtr.eventsStreamer = NewEventsStreamer(mgtr.migrationContext)
1445 if err := mgtr.eventsStreamer.InitDBConnections(); err != nil {
1446 return err
1447 }
1448 mgtr.eventsStreamer.AddListener(
1449 false,
1450 mgtr.migrationContext.DatabaseName,
1451 mgtr.migrationContext.GetChangelogTableName(),
1452 func(dmlEntry *binlog.BinlogEntry) error {
1453 return mgtr.onChangelogEvent(dmlEntry)
1454 },
1455 )
1456
1457 go func() {
1458 mgtr.migrationContext.Log.Debugf("Beginning streaming")
1459 err := mgtr.eventsStreamer.StreamEvents(mgtr.canStopStreaming)
1460 if err != nil {
1461 // Use helper to prevent deadlock if listenOnPanicAbort already exited
1462 _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err)
1463 }
1464 mgtr.migrationContext.Log.Debugf("Done streaming")
1465 }()
1466
1467 go func() {
1468 ticker := time.NewTicker(time.Second)
1469 defer ticker.Stop()
1470 for range ticker.C {
1471 if atomic.LoadInt64(&mgtr.finishedMigrating) > 0 {
1472 return
1473 }
1474 mgtr.migrationContext.SetRecentBinlogCoordinates(mgtr.eventsStreamer.GetCurrentBinlogCoordinates())
1475 }
1476 }()
1477 return nil
1478}
1479
1480// addDMLEventsListener begins listening for binlog events on the original table,
1481// and creates & enqueues a write task per such event.

Callers 2

MigrateMethod · 0.95
RevertMethod · 0.95

Calls 11

onChangelogEventMethod · 0.95
SendWithContextFunction · 0.92
NewEventsStreamerFunction · 0.85
AddListenerMethod · 0.80
GetChangelogTableNameMethod · 0.80
GetContextMethod · 0.80
DebugfMethod · 0.65
StreamEventsMethod · 0.65
InitDBConnectionsMethod · 0.45

Tested by

no test coverage detected