MCPcopy
hub / github.com/github/gh-ost / onApplyEventStruct

Method onApplyEventStruct

go/logic/migrator.go:1669–1723  ·  view source on GitHub ↗
(eventStruct *applyEventStruct)

Source from the content-addressed store, hash-verified

1667}
1668
1669func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
1670 handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error {
1671 if eventStruct.writeFunc != nil {
1672 if err := mgtr.retryOperation(*eventStruct.writeFunc); err != nil {
1673 return mgtr.migrationContext.Log.Errore(err)
1674 }
1675 }
1676 return nil
1677 }
1678 if eventStruct.dmlEvent == nil {
1679 return handleNonDMLEventStruct(eventStruct)
1680 }
1681 if eventStruct.dmlEvent != nil {
1682 dmlEvents := [](*binlog.BinlogDMLEvent){}
1683 dmlEvents = append(dmlEvents, eventStruct.dmlEvent)
1684 var nonDmlStructToApply *applyEventStruct
1685
1686 availableEvents := len(mgtr.applyEventsQueue)
1687 batchSize := int(atomic.LoadInt64(&mgtr.migrationContext.DMLBatchSize))
1688 if availableEvents > batchSize-1 {
1689 // The "- 1" is because we already consumed one event: the original event that led to this function getting called.
1690 // So, if DMLBatchSize==1 we wish to not process any further events
1691 availableEvents = batchSize - 1
1692 }
1693 for i := 0; i < availableEvents; i++ {
1694 additionalStruct := <-mgtr.applyEventsQueue
1695 if additionalStruct.dmlEvent == nil {
1696 // Not a DML. We don't group this, and we don't batch any further
1697 nonDmlStructToApply = additionalStruct
1698 break
1699 }
1700 dmlEvents = append(dmlEvents, additionalStruct.dmlEvent)
1701 }
1702 // Create a task to apply the DML event; this will be execute by executeWriteFuncs()
1703 var applyEventFunc tableWriteFunc = func() error {
1704 return mgtr.applier.ApplyDMLEventQueries(dmlEvents)
1705 }
1706 if err := mgtr.retryOperation(applyEventFunc); err != nil {
1707 return mgtr.migrationContext.Log.Errore(err)
1708 }
1709 // update applier coordinates
1710 mgtr.applier.CurrentCoordinatesMutex.Lock()
1711 mgtr.applier.CurrentCoordinates = eventStruct.coords
1712 mgtr.applier.CurrentCoordinatesMutex.Unlock()
1713
1714 if nonDmlStructToApply != nil {
1715 // We pulled DML events from the queue, and then we hit a non-DML event. Wait!
1716 // We need to handle it!
1717 if err := handleNonDMLEventStruct(nonDmlStructToApply); err != nil {
1718 return mgtr.migrationContext.Log.Errore(err)
1719 }
1720 }
1721 }
1722 return nil
1723}
1724
1725// Checkpoint attempts to write a checkpoint of the Migrator's current state.
1726// It gets the binlog coordinates of the last received trx and waits until the

Callers 2

executeWriteFuncsMethod · 0.95
executeDMLWriteFuncsMethod · 0.95

Calls 3

retryOperationMethod · 0.95
ApplyDMLEventQueriesMethod · 0.80
ErroreMethod · 0.65

Tested by

no test coverage detected