MCPcopy
hub / github.com/uber/aresdb / createNewArchiveStoreVersionForBackfill

Method createNewArchiveStoreVersionForBackfill

memstore/backfill.go:108–266  ·  view source on GitHub ↗
(
	backfillPatches map[int32]*backfillPatch, reporter BackfillJobDetailReporter, jobKey string)

Source from the content-addressed store, hash-verified

106}
107
108func (shard *TableShard) createNewArchiveStoreVersionForBackfill(
109 backfillPatches map[int32]*backfillPatch, reporter BackfillJobDetailReporter, jobKey string) (err error) {
110 // Block column deletion
111 shard.columnDeletion.Lock()
112 defer shard.columnDeletion.Unlock()
113
114 // Snapshot schema
115 shard.Schema.RLock()
116 columnDeletions := shard.Schema.GetColumnDeletions()
117 sortColumns := shard.Schema.Schema.ArchivingSortColumns
118 primaryKeyColumns := shard.Schema.Schema.PrimaryKeyColumns
119 dataTypes := shard.Schema.ValueTypeByColumn
120 defaultValues := shard.Schema.DefaultValues
121 numColumns := len(shard.Schema.ValueTypeByColumn)
122 shard.Schema.RUnlock()
123
124 var numAffectedDays int
125 dayIdx := 1
126 reporter(jobKey, func(status *BackfillJobDetail) {
127 status.Stage = BackfillApplyPatch
128 status.Current = 0
129 status.Total = len(backfillPatches)
130 })
131
132 lockTimer := utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).
133 GetTimer(utils.BackfillLockTiming)
134 var totalLockDuration time.Duration
135 defer func() {
136 lockTimer.Record(totalLockDuration)
137 reporter(jobKey, func(status *BackfillJobDetail) {
138 status.LockDuration = totalLockDuration
139 })
140 }()
141
142 // Only those batches that are affected and changed need to be cleaned.
143 for day, patch := range backfillPatches {
144 baseBatch := shard.ArchiveStore.CurrentVersion.RequestBatch(day)
145
146 var requestedVPs []memCom.ArchiveVectorParty
147 for columnID := 0; columnID < numColumns; columnID++ {
148 requestedVP := baseBatch.RequestVectorParty(columnID)
149 requestedVP.WaitForDiskLoad()
150 requestedVPs = append(requestedVPs, requestedVP)
151 }
152
153 backfillCtx := newBackfillContext(baseBatch, patch, shard.Schema, columnDeletions, sortColumns,
154 primaryKeyColumns, dataTypes, defaultValues, shard.HostMemoryManager)
155
156 // Real backfill implementation.
157 if err = backfillCtx.backfill(reporter, jobKey); err != nil {
158 UnpinVectorParties(requestedVPs)
159 backfillCtx.release()
160 return
161 }
162
163 if backfillCtx.okForEarlyUnpin {
164 UnpinVectorParties(requestedVPs)
165 }

Callers 2

BackfillMethod · 0.80
backfill_test.goFile · 0.80

Calls 15

GetReporterFunction · 0.92
NowFunction · 0.92
newBackfillContextFunction · 0.85
UnpinVectorPartiesFunction · 0.85
NewArchiveStoreVersionFunction · 0.85
GetColumnDeletionsMethod · 0.80
GetTimerMethod · 0.80
RequestBatchMethod · 0.80
RequestVectorPartyMethod · 0.80
backfillMethod · 0.80
releaseMethod · 0.80
WriteToDiskMethod · 0.80

Tested by

no test coverage detected