MCPcopy
hub / github.com/uber/aresdb / processShard

Method processShard

query/aql_processor.go:162–251  ·  view source on GitHub ↗
(memStore memstore.MemStore, shardID int, previousBatchExecutor BatchExecutor)

Source from the content-addressed store, hash-verified

160}
161
162func (qc *AQLQueryContext) processShard(memStore memstore.MemStore, shardID int, previousBatchExecutor BatchExecutor) BatchExecutor {
163 var liveRecordsProcessed, archiveRecordsProcessed, liveBatchProcessed, archiveBatchProcessed, liveBytesTransferred, archiveBytesTransferred int
164 shard, err := memStore.GetTableShard(qc.Query.Table, shardID)
165 if err != nil {
166 qc.Error = utils.StackError(err, "failed to get shard %d for table %s",
167 shardID, qc.Query.Table)
168 return previousBatchExecutor
169 }
170 defer shard.Users.Done()
171
172 var archiveStore *memstore.ArchiveStoreVersion
173 var cutoff uint32
174 if shard.Schema.Schema.IsFactTable {
175 archiveStore = shard.ArchiveStore.GetCurrentVersion()
176 defer archiveStore.Users.Done()
177 cutoff = archiveStore.ArchivingCutoff
178 }
179
180 // Process live batches.
181 if qc.toTime == nil || cutoff < uint32(qc.toTime.Time.Unix()) {
182 batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs()
183 for i, batchID := range batchIDs {
184 if qc.OOPK.done {
185 break
186 }
187 batch := shard.LiveStore.GetBatchForRead(batchID)
188 if batch == nil {
189 continue
190 }
191
192 // For now, dimension table does not persist min and max therefore
193 // we can only skip live batch for fact table.
194 // TODO: Persist min/max/numTrues when snapshotting.
195 if shard.Schema.Schema.IsFactTable && qc.shouldSkipLiveBatch(batch) {
196 batch.RUnlock()
197 qc.OOPK.LiveBatchStats.NumBatchSkipped++
198 continue
199 }
200
201 liveBatchProcessed++
202 size := batch.Capacity
203 if i == len(batchIDs)-1 {
204 size = numRecordsInLastBatch
205 }
206 liveRecordsProcessed += size
207 previousBatchExecutor = qc.processBatch(&batch.Batch,
208 batchID,
209 size,
210 qc.transferLiveBatch(batch, size),
211 qc.liveBatchCustomFilterExecutor(cutoff), previousBatchExecutor, true)
212 qc.cudaStreams[0], qc.cudaStreams[1] = qc.cudaStreams[1], qc.cudaStreams[0]
213 liveBytesTransferred += qc.OOPK.currentBatch.stats.bytesTransferred
214 }
215 }
216
217 // Process archive batches.
218 if archiveStore != nil && (qc.fromTime == nil || cutoff > uint32(qc.fromTime.Time.Unix())) {
219 scanner := qc.TableScanners[0]

Callers 1

ProcessQueryMethod · 0.95

Calls 15

shouldSkipLiveBatchMethod · 0.95
processBatchMethod · 0.95
transferLiveBatchMethod · 0.95
RequestBatchMethod · 0.95
transferArchiveBatchMethod · 0.95
StackErrorFunction · 0.92
GetReporterFunction · 0.92
GetCurrentVersionMethod · 0.80
GetBatchIDsMethod · 0.80
GetCounterMethod · 0.80

Tested by

no test coverage detected