StreamEvents will begin streaming events. It will be blocking, so should be executed by a goroutine
(canStopStreaming func() bool)
| 176 | // StreamEvents will begin streaming events. It will be blocking, so should be |
| 177 | // executed by a goroutine |
| 178 | func (es *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { |
| 179 | go func() { |
| 180 | for binlogEntry := range es.eventsChannel { |
| 181 | if binlogEntry.DmlEvent != nil { |
| 182 | es.notifyListeners(binlogEntry) |
| 183 | } |
| 184 | } |
| 185 | }() |
| 186 | // The next should block and execute forever, unless there's a serious error. |
| 187 | var successiveFailures int |
| 188 | var reconnectCoords mysql.BinlogCoordinates |
| 189 | ctx := es.migrationContext.GetContext() |
| 190 | for { |
| 191 | // Check for context cancellation each iteration |
| 192 | if err := ctx.Err(); err != nil { |
| 193 | return err |
| 194 | } |
| 195 | if canStopStreaming() { |
| 196 | return nil |
| 197 | } |
| 198 | // We will reconnect the binlog streamer at the coordinates |
| 199 | // of the last trx that was read completely from the streamer. |
| 200 | // Since row event application is idempotent, it's OK if we reapply some events. |
| 201 | if err := es.binlogReader.StreamEvents(canStopStreaming, es.eventsChannel); err != nil { |
| 202 | if canStopStreaming() { |
| 203 | return nil |
| 204 | } |
| 205 | |
| 206 | es.migrationContext.Log.Infof("StreamEvents encountered unexpected error: %+v", err) |
| 207 | es.migrationContext.MarkPointOfInterest() |
| 208 | time.Sleep(ReconnectStreamerSleepSeconds * time.Second) |
| 209 | |
| 210 | // See if there's retry overflow |
| 211 | if es.migrationContext.BinlogSyncerMaxReconnectAttempts > 0 && successiveFailures >= es.migrationContext.BinlogSyncerMaxReconnectAttempts { |
| 212 | return fmt.Errorf("%d successive failures in streamer reconnect at coordinates %+v", successiveFailures, reconnectCoords) |
| 213 | } |
| 214 | |
| 215 | // Reposition at same coordinates |
| 216 | if es.binlogReader.LastTrxCoords != nil { |
| 217 | reconnectCoords = es.binlogReader.LastTrxCoords.Clone() |
| 218 | } else { |
| 219 | reconnectCoords = es.initialBinlogCoordinates.Clone() |
| 220 | } |
| 221 | if !reconnectCoords.SmallerThan(es.GetCurrentBinlogCoordinates()) { |
| 222 | successiveFailures += 1 |
| 223 | } else { |
| 224 | successiveFailures = 0 |
| 225 | } |
| 226 | |
| 227 | es.migrationContext.Log.Infof("Reconnecting EventsStreamer... Will resume at %+v", reconnectCoords) |
| 228 | _ = es.binlogReader.Close() |
| 229 | if err := es.initBinlogReader(reconnectCoords); err != nil { |
| 230 | return err |
| 231 | } |
| 232 | } |
| 233 | } |
| 234 | } |
| 235 |