(eventStruct *applyEventStruct)
| 1667 | } |
| 1668 | |
| 1669 | func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { |
| 1670 | handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error { |
| 1671 | if eventStruct.writeFunc != nil { |
| 1672 | if err := mgtr.retryOperation(*eventStruct.writeFunc); err != nil { |
| 1673 | return mgtr.migrationContext.Log.Errore(err) |
| 1674 | } |
| 1675 | } |
| 1676 | return nil |
| 1677 | } |
| 1678 | if eventStruct.dmlEvent == nil { |
| 1679 | return handleNonDMLEventStruct(eventStruct) |
| 1680 | } |
| 1681 | if eventStruct.dmlEvent != nil { |
| 1682 | dmlEvents := [](*binlog.BinlogDMLEvent){} |
| 1683 | dmlEvents = append(dmlEvents, eventStruct.dmlEvent) |
| 1684 | var nonDmlStructToApply *applyEventStruct |
| 1685 | |
| 1686 | availableEvents := len(mgtr.applyEventsQueue) |
| 1687 | batchSize := int(atomic.LoadInt64(&mgtr.migrationContext.DMLBatchSize)) |
| 1688 | if availableEvents > batchSize-1 { |
| 1689 | // The "- 1" is because we already consumed one event: the original event that led to this function getting called. |
| 1690 | // So, if DMLBatchSize==1 we wish to not process any further events |
| 1691 | availableEvents = batchSize - 1 |
| 1692 | } |
| 1693 | for i := 0; i < availableEvents; i++ { |
| 1694 | additionalStruct := <-mgtr.applyEventsQueue |
| 1695 | if additionalStruct.dmlEvent == nil { |
| 1696 | // Not a DML. We don't group this, and we don't batch any further |
| 1697 | nonDmlStructToApply = additionalStruct |
| 1698 | break |
| 1699 | } |
| 1700 | dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) |
| 1701 | } |
| 1702 | // Create a task to apply the DML event; this will be execute by executeWriteFuncs() |
| 1703 | var applyEventFunc tableWriteFunc = func() error { |
| 1704 | return mgtr.applier.ApplyDMLEventQueries(dmlEvents) |
| 1705 | } |
| 1706 | if err := mgtr.retryOperation(applyEventFunc); err != nil { |
| 1707 | return mgtr.migrationContext.Log.Errore(err) |
| 1708 | } |
| 1709 | // update applier coordinates |
| 1710 | mgtr.applier.CurrentCoordinatesMutex.Lock() |
| 1711 | mgtr.applier.CurrentCoordinates = eventStruct.coords |
| 1712 | mgtr.applier.CurrentCoordinatesMutex.Unlock() |
| 1713 | |
| 1714 | if nonDmlStructToApply != nil { |
| 1715 | // We pulled DML events from the queue, and then we hit a non-DML event. Wait! |
| 1716 | // We need to handle it! |
| 1717 | if err := handleNonDMLEventStruct(nonDmlStructToApply); err != nil { |
| 1718 | return mgtr.migrationContext.Log.Errore(err) |
| 1719 | } |
| 1720 | } |
| 1721 | } |
| 1722 | return nil |
| 1723 | } |
| 1724 | |
| 1725 | // Checkpoint attempts to write a checkpoint of the Migrator's current state. |
| 1726 | // It gets the binlog coordinates of the last received trx and waits until the |
no test coverage detected