MCPcopy
hub / github.com/github/gh-ost / StreamEvents

Method StreamEvents

go/logic/streamer.go:178–234  ·  view source on GitHub ↗

StreamEvents will begin streaming events. It will be blocking, so should be executed by a goroutine

(canStopStreaming func() bool)

Source from the content-addressed store, hash-verified

176// StreamEvents will begin streaming events. It will be blocking, so should be
177// executed by a goroutine
178func (es *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
179 go func() {
180 for binlogEntry := range es.eventsChannel {
181 if binlogEntry.DmlEvent != nil {
182 es.notifyListeners(binlogEntry)
183 }
184 }
185 }()
186 // The next should block and execute forever, unless there's a serious error.
187 var successiveFailures int
188 var reconnectCoords mysql.BinlogCoordinates
189 ctx := es.migrationContext.GetContext()
190 for {
191 // Check for context cancellation each iteration
192 if err := ctx.Err(); err != nil {
193 return err
194 }
195 if canStopStreaming() {
196 return nil
197 }
198 // We will reconnect the binlog streamer at the coordinates
199 // of the last trx that was read completely from the streamer.
200 // Since row event application is idempotent, it's OK if we reapply some events.
201 if err := es.binlogReader.StreamEvents(canStopStreaming, es.eventsChannel); err != nil {
202 if canStopStreaming() {
203 return nil
204 }
205
206 es.migrationContext.Log.Infof("StreamEvents encountered unexpected error: %+v", err)
207 es.migrationContext.MarkPointOfInterest()
208 time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
209
210 // See if there's retry overflow
211 if es.migrationContext.BinlogSyncerMaxReconnectAttempts > 0 && successiveFailures >= es.migrationContext.BinlogSyncerMaxReconnectAttempts {
212 return fmt.Errorf("%d successive failures in streamer reconnect at coordinates %+v", successiveFailures, reconnectCoords)
213 }
214
215 // Reposition at same coordinates
216 if es.binlogReader.LastTrxCoords != nil {
217 reconnectCoords = es.binlogReader.LastTrxCoords.Clone()
218 } else {
219 reconnectCoords = es.initialBinlogCoordinates.Clone()
220 }
221 if !reconnectCoords.SmallerThan(es.GetCurrentBinlogCoordinates()) {
222 successiveFailures += 1
223 } else {
224 successiveFailures = 0
225 }
226
227 es.migrationContext.Log.Infof("Reconnecting EventsStreamer... Will resume at %+v", reconnectCoords)
228 _ = es.binlogReader.Close()
229 if err := es.initBinlogReader(reconnectCoords); err != nil {
230 return err
231 }
232 }
233 }
234}
235

Callers 2

TestStreamEventsMethod · 0.95

Calls 11

notifyListenersMethod · 0.95
SmallerThanMethod · 0.95
initBinlogReaderMethod · 0.95
GetContextMethod · 0.80
MarkPointOfInterestMethod · 0.80
StreamEventsMethod · 0.65
InfofMethod · 0.65
ErrorfMethod · 0.65
CloneMethod · 0.65
CloseMethod · 0.45

Tested by 2

TestStreamEventsMethod · 0.76