(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry)
| 86 | } |
| 87 | |
| 88 | func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error { |
| 89 | currentCoords := gmr.GetCurrentBinlogCoordinates() |
| 90 | dml := ToEventDML(ev.Header.EventType.String()) |
| 91 | if dml == NotDML { |
| 92 | return fmt.Errorf("unknown DML type: %s", ev.Header.EventType.String()) |
| 93 | } |
| 94 | for i, row := range rowsEvent.Rows { |
| 95 | if dml == UpdateDML && i%2 == 1 { |
| 96 | // An update has two rows (WHERE+SET) |
| 97 | // We do both at the same time |
| 98 | continue |
| 99 | } |
| 100 | binlogEntry := NewBinlogEntryAt(currentCoords) |
| 101 | binlogEntry.DmlEvent = NewBinlogDMLEvent( |
| 102 | string(rowsEvent.Table.Schema), |
| 103 | string(rowsEvent.Table.Table), |
| 104 | dml, |
| 105 | ) |
| 106 | switch dml { |
| 107 | case InsertDML: |
| 108 | { |
| 109 | binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row) |
| 110 | } |
| 111 | case UpdateDML: |
| 112 | { |
| 113 | binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) |
| 114 | binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1]) |
| 115 | } |
| 116 | case DeleteDML: |
| 117 | { |
| 118 | binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) |
| 119 | } |
| 120 | } |
| 121 | |
| 122 | // The channel will do the throttling. Whoever is reading from the channel |
| 123 | // decides whether action is taken synchronously (meaning we wait before |
| 124 | // next iteration) or asynchronously (we keep pushing more events) |
| 125 | // In reality, reads will be synchronous |
| 126 | entriesChannel <- binlogEntry |
| 127 | } |
| 128 | return nil |
| 129 | } |
| 130 | |
| 131 | // StreamEvents |
| 132 | func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error { |
no test coverage detected