( backfillPatches map[int32]*backfillPatch, reporter BackfillJobDetailReporter, jobKey string)
| 106 | } |
| 107 | |
| 108 | func (shard *TableShard) createNewArchiveStoreVersionForBackfill( |
| 109 | backfillPatches map[int32]*backfillPatch, reporter BackfillJobDetailReporter, jobKey string) (err error) { |
| 110 | // Block column deletion |
| 111 | shard.columnDeletion.Lock() |
| 112 | defer shard.columnDeletion.Unlock() |
| 113 | |
| 114 | // Snapshot schema |
| 115 | shard.Schema.RLock() |
| 116 | columnDeletions := shard.Schema.GetColumnDeletions() |
| 117 | sortColumns := shard.Schema.Schema.ArchivingSortColumns |
| 118 | primaryKeyColumns := shard.Schema.Schema.PrimaryKeyColumns |
| 119 | dataTypes := shard.Schema.ValueTypeByColumn |
| 120 | defaultValues := shard.Schema.DefaultValues |
| 121 | numColumns := len(shard.Schema.ValueTypeByColumn) |
| 122 | shard.Schema.RUnlock() |
| 123 | |
| 124 | var numAffectedDays int |
| 125 | dayIdx := 1 |
| 126 | reporter(jobKey, func(status *BackfillJobDetail) { |
| 127 | status.Stage = BackfillApplyPatch |
| 128 | status.Current = 0 |
| 129 | status.Total = len(backfillPatches) |
| 130 | }) |
| 131 | |
| 132 | lockTimer := utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID). |
| 133 | GetTimer(utils.BackfillLockTiming) |
| 134 | var totalLockDuration time.Duration |
| 135 | defer func() { |
| 136 | lockTimer.Record(totalLockDuration) |
| 137 | reporter(jobKey, func(status *BackfillJobDetail) { |
| 138 | status.LockDuration = totalLockDuration |
| 139 | }) |
| 140 | }() |
| 141 | |
| 142 | // Only those batches that are affected and changed need to be cleaned. |
| 143 | for day, patch := range backfillPatches { |
| 144 | baseBatch := shard.ArchiveStore.CurrentVersion.RequestBatch(day) |
| 145 | |
| 146 | var requestedVPs []memCom.ArchiveVectorParty |
| 147 | for columnID := 0; columnID < numColumns; columnID++ { |
| 148 | requestedVP := baseBatch.RequestVectorParty(columnID) |
| 149 | requestedVP.WaitForDiskLoad() |
| 150 | requestedVPs = append(requestedVPs, requestedVP) |
| 151 | } |
| 152 | |
| 153 | backfillCtx := newBackfillContext(baseBatch, patch, shard.Schema, columnDeletions, sortColumns, |
| 154 | primaryKeyColumns, dataTypes, defaultValues, shard.HostMemoryManager) |
| 155 | |
| 156 | // Real backfill implementation. |
| 157 | if err = backfillCtx.backfill(reporter, jobKey); err != nil { |
| 158 | UnpinVectorParties(requestedVPs) |
| 159 | backfillCtx.release() |
| 160 | return |
| 161 | } |
| 162 | |
| 163 | if backfillCtx.okForEarlyUnpin { |
| 164 | UnpinVectorParties(requestedVPs) |
| 165 | } |
no test coverage detected