MCPcopy
hub / github.com/github/gh-ost / TestStreamEvents

Method TestStreamEvents

go/logic/streamer_test.go:66–141  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

64}
65
66func (suite *EventsStreamerTestSuite) TestStreamEvents() {
67 ctx := context.Background()
68
69 _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", getTestTableName()))
70 suite.Require().NoError(err)
71
72 connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
73 suite.Require().NoError(err)
74
75 migrationContext := newTestMigrationContext()
76 migrationContext.ApplierConnectionConfig = connectionConfig
77 migrationContext.InspectorConnectionConfig = connectionConfig
78 migrationContext.SetConnectionConfig("innodb")
79
80 streamer := NewEventsStreamer(migrationContext)
81
82 err = streamer.InitDBConnections()
83 suite.Require().NoError(err)
84 defer streamer.Close()
85 defer streamer.Teardown()
86
87 streamCtx, cancel := context.WithCancel(context.Background())
88
89 dmlEvents := make([]*binlog.BinlogDMLEvent, 0)
90 err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error {
91 dmlEvents = append(dmlEvents, event.DmlEvent)
92
93 // Stop once we've collected three events
94 if len(dmlEvents) == 3 {
95 cancel()
96 }
97
98 return nil
99 })
100 suite.Require().NoError(err)
101
102 group := errgroup.Group{}
103 group.Go(func() error {
104 //nolint:contextcheck
105 return streamer.StreamEvents(func() bool {
106 return streamCtx.Err() != nil
107 })
108 })
109
110 group.Go(func() error {
111 var err error
112
113 _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, name) VALUES (1, 'foo')", getTestTableName()))
114 if err != nil {
115 return err
116 }
117
118 _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, name) VALUES (2, 'bar')", getTestTableName()))
119 if err != nil {
120 return err
121 }
122
123 _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, name) VALUES (3, 'baz')", getTestTableName()))

Callers

nothing calls this directly

Calls 11

InitDBConnectionsMethod · 0.95
CloseMethod · 0.95
TeardownMethod · 0.95
AddListenerMethod · 0.95
StreamEventsMethod · 0.95
getTestTableNameFunction · 0.85
getTestConnectionConfigFunction · 0.85
newTestMigrationContextFunction · 0.85
NewEventsStreamerFunction · 0.85
SetConnectionConfigMethod · 0.80
LenMethod · 0.45

Tested by

no test coverage detected