MCPcopy
hub / github.com/github/gh-ost / handleRowsEvent

Method handleRowsEvent

go/binlog/gomysql_reader.go:88–129  ·  view source on GitHub ↗
(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry)

Source from the content-addressed store, hash-verified

86}
87
88func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
89 currentCoords := gmr.GetCurrentBinlogCoordinates()
90 dml := ToEventDML(ev.Header.EventType.String())
91 if dml == NotDML {
92 return fmt.Errorf("unknown DML type: %s", ev.Header.EventType.String())
93 }
94 for i, row := range rowsEvent.Rows {
95 if dml == UpdateDML && i%2 == 1 {
96 // An update has two rows (WHERE+SET)
97 // We do both at the same time
98 continue
99 }
100 binlogEntry := NewBinlogEntryAt(currentCoords)
101 binlogEntry.DmlEvent = NewBinlogDMLEvent(
102 string(rowsEvent.Table.Schema),
103 string(rowsEvent.Table.Table),
104 dml,
105 )
106 switch dml {
107 case InsertDML:
108 {
109 binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
110 }
111 case UpdateDML:
112 {
113 binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
114 binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
115 }
116 case DeleteDML:
117 {
118 binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
119 }
120 }
121
122 // The channel will do the throttling. Whoever is reading from the channel
123 // decides whether action is taken synchronously (meaning we wait before
124 // next iteration) or asynchronously (we keep pushing more events)
125 // In reality, reads will be synchronous
126 entriesChannel <- binlogEntry
127 }
128 return nil
129}
130
131// StreamEvents
132func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {

Callers 1

StreamEventsMethod · 0.95

Calls 7

ToColumnValuesFunction · 0.92
ToEventDMLFunction · 0.85
NewBinlogEntryAtFunction · 0.85
NewBinlogDMLEventFunction · 0.85
StringMethod · 0.65
ErrorfMethod · 0.65

Tested by

no test coverage detected