MCPcopy
hub / github.com/github/gh-ost / StreamEvents

Method StreamEvents

go/binlog/gomysql_reader.go:132–195  ·  view source on GitHub ↗

StreamEvents

(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry)

Source from the content-addressed store, hash-verified

130
131// StreamEvents
132func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
133 for !canStopStreaming() {
134 ev, err := gmr.binlogStreamer.GetEvent(context.Background())
135 if err != nil {
136 return err
137 }
138
139 // Update binlog coords if using file-based coords.
140 // GTID coordinates are updated on receiving GTID events.
141 if !gmr.migrationContext.UseGTIDs {
142 gmr.currentCoordinatesMutex.Lock()
143 coords := gmr.currentCoordinates.(*mysql.FileBinlogCoordinates)
144 prevCoords := coords.Clone().(*mysql.FileBinlogCoordinates)
145 coords.LogPos = int64(ev.Header.LogPos)
146 coords.EventSize = int64(ev.Header.EventSize)
147 if coords.IsLogPosOverflowBeyond4Bytes(prevCoords) {
148 gmr.currentCoordinatesMutex.Unlock()
149 return fmt.Errorf("unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", coords)
150 }
151 gmr.currentCoordinatesMutex.Unlock()
152 }
153
154 switch event := ev.Event.(type) {
155 case *replication.GTIDEvent:
156 if !gmr.migrationContext.UseGTIDs {
157 continue
158 }
159 sid, err := uuid.FromBytes(event.SID)
160 if err != nil {
161 return err
162 }
163 gmr.currentCoordinatesMutex.Lock()
164 if gmr.LastTrxCoords != nil {
165 gmr.currentCoordinates = gmr.LastTrxCoords.Clone()
166 }
167 coords := gmr.currentCoordinates.(*mysql.GTIDBinlogCoordinates)
168 trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: event.GNO, Stop: event.GNO + 1})
169 coords.GTIDSet.AddSet(trxGset)
170 gmr.currentCoordinatesMutex.Unlock()
171 case *replication.RotateEvent:
172 if gmr.migrationContext.UseGTIDs {
173 continue
174 }
175 gmr.currentCoordinatesMutex.Lock()
176 coords := gmr.currentCoordinates.(*mysql.FileBinlogCoordinates)
177 coords.LogFile = string(event.NextLogName)
178 gmr.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", coords.LogFile, int64(ev.Header.LogPos), event.NextLogName)
179 gmr.currentCoordinatesMutex.Unlock()
180 case *replication.XIDEvent:
181 if gmr.migrationContext.UseGTIDs {
182 gmr.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)}
183 } else {
184 gmr.LastTrxCoords = gmr.currentCoordinates.Clone()
185 }
186 case *replication.RowsEvent:
187 if err := gmr.handleRowsEvent(ev, event, entriesChannel); err != nil {
188 return err
189 }

Callers

nothing calls this directly

Calls 6

handleRowsEventMethod · 0.95
CloneMethod · 0.65
ErrorfMethod · 0.65
InfofMethod · 0.65
DebugfMethod · 0.65

Tested by

no test coverage detected