StreamEvents
(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry)
| 130 | |
| 131 | // StreamEvents |
| 132 | func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error { |
| 133 | for !canStopStreaming() { |
| 134 | ev, err := gmr.binlogStreamer.GetEvent(context.Background()) |
| 135 | if err != nil { |
| 136 | return err |
| 137 | } |
| 138 | |
| 139 | // Update binlog coords if using file-based coords. |
| 140 | // GTID coordinates are updated on receiving GTID events. |
| 141 | if !gmr.migrationContext.UseGTIDs { |
| 142 | gmr.currentCoordinatesMutex.Lock() |
| 143 | coords := gmr.currentCoordinates.(*mysql.FileBinlogCoordinates) |
| 144 | prevCoords := coords.Clone().(*mysql.FileBinlogCoordinates) |
| 145 | coords.LogPos = int64(ev.Header.LogPos) |
| 146 | coords.EventSize = int64(ev.Header.EventSize) |
| 147 | if coords.IsLogPosOverflowBeyond4Bytes(prevCoords) { |
| 148 | gmr.currentCoordinatesMutex.Unlock() |
| 149 | return fmt.Errorf("unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", coords) |
| 150 | } |
| 151 | gmr.currentCoordinatesMutex.Unlock() |
| 152 | } |
| 153 | |
| 154 | switch event := ev.Event.(type) { |
| 155 | case *replication.GTIDEvent: |
| 156 | if !gmr.migrationContext.UseGTIDs { |
| 157 | continue |
| 158 | } |
| 159 | sid, err := uuid.FromBytes(event.SID) |
| 160 | if err != nil { |
| 161 | return err |
| 162 | } |
| 163 | gmr.currentCoordinatesMutex.Lock() |
| 164 | if gmr.LastTrxCoords != nil { |
| 165 | gmr.currentCoordinates = gmr.LastTrxCoords.Clone() |
| 166 | } |
| 167 | coords := gmr.currentCoordinates.(*mysql.GTIDBinlogCoordinates) |
| 168 | trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: event.GNO, Stop: event.GNO + 1}) |
| 169 | coords.GTIDSet.AddSet(trxGset) |
| 170 | gmr.currentCoordinatesMutex.Unlock() |
| 171 | case *replication.RotateEvent: |
| 172 | if gmr.migrationContext.UseGTIDs { |
| 173 | continue |
| 174 | } |
| 175 | gmr.currentCoordinatesMutex.Lock() |
| 176 | coords := gmr.currentCoordinates.(*mysql.FileBinlogCoordinates) |
| 177 | coords.LogFile = string(event.NextLogName) |
| 178 | gmr.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", coords.LogFile, int64(ev.Header.LogPos), event.NextLogName) |
| 179 | gmr.currentCoordinatesMutex.Unlock() |
| 180 | case *replication.XIDEvent: |
| 181 | if gmr.migrationContext.UseGTIDs { |
| 182 | gmr.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)} |
| 183 | } else { |
| 184 | gmr.LastTrxCoords = gmr.currentCoordinates.Clone() |
| 185 | } |
| 186 | case *replication.RowsEvent: |
| 187 | if err := gmr.handleRowsEvent(ev, event, entriesChannel); err != nil { |
| 188 | return err |
| 189 | } |
nothing calls this directly
no test coverage detected