Backfill is the process of merging records with event time older than cutoff with archive batches.
(table string, shardID int, reporter BackfillJobDetailReporter)
| 28 | // Backfill is the process of merging records with event time older than cutoff with |
| 29 | // archive batches. |
| 30 | func (m *memStoreImpl) Backfill(table string, shardID int, reporter BackfillJobDetailReporter) error { |
| 31 | backfillTimer := utils.GetReporter(table, shardID).GetTimer(utils.BackfillTimingTotal) |
| 32 | start := utils.Now() |
| 33 | jobKey := getIdentifier(table, shardID, memCom.BackfillJobType) |
| 34 | |
| 35 | defer func() { |
| 36 | duration := utils.Now().Sub(start) |
| 37 | backfillTimer.Record(duration) |
| 38 | reporter(jobKey, func(status *BackfillJobDetail) { |
| 39 | status.LastDuration = duration |
| 40 | }) |
| 41 | utils.GetReporter(table, shardID). |
| 42 | GetCounter(utils.BackfillCount).Inc(1) |
| 43 | }() |
| 44 | |
| 45 | shard, err := m.GetTableShard(table, shardID) |
| 46 | if err != nil { |
| 47 | utils.GetLogger().With("table", table, "shard", shardID, "error", err).Warn("Failed to find shard, is it deleted?") |
| 48 | return nil |
| 49 | } |
| 50 | |
| 51 | defer shard.Users.Done() |
| 52 | |
| 53 | backfillMgr := shard.LiveStore.BackfillManager |
| 54 | backfillBatches, currentRedoFile, currentBatchOffset := backfillMgr.StartBackfill() |
| 55 | |
| 56 | // no data to backfill: checkpoint if applicable |
| 57 | if backfillBatches == nil { |
| 58 | backfillMgr.Done(currentRedoFile, currentBatchOffset, shard.metaStore) |
| 59 | reporter(jobKey, func(status *BackfillJobDetail) { |
| 60 | status.RedologFile = currentRedoFile |
| 61 | status.BatchOffset = currentBatchOffset |
| 62 | status.Current = 0 |
| 63 | status.Total = 0 |
| 64 | status.NumAffectedDays = 0 |
| 65 | status.NumRecords = 0 |
| 66 | }) |
| 67 | return nil |
| 68 | } |
| 69 | |
| 70 | backfillPatches, err := createBackfillPatches(backfillBatches, reporter, jobKey) |
| 71 | if err != nil { |
| 72 | return err |
| 73 | } |
| 74 | |
| 75 | if err = shard.createNewArchiveStoreVersionForBackfill( |
| 76 | backfillPatches, reporter, jobKey); err != nil { |
| 77 | if err == metaCom.ErrTableDoesNotExist { |
| 78 | utils.GetLogger().With("table", table, "shard", shardID).Warn("failed to create archive store version for non-exist table") |
| 79 | return nil |
| 80 | } |
| 81 | return err |
| 82 | } |
| 83 | |
| 84 | // checkpoint backfill progress |
| 85 | backfillMgr.Done(currentRedoFile, currentBatchOffset, shard.metaStore) |
| 86 | |
| 87 | // Wait for queries in other goroutines to prevent archiving from prematurely purging the old version. |
nothing calls this directly
no test coverage detected