(memStore memstore.MemStore, shardID int, previousBatchExecutor BatchExecutor)
| 160 | } |
| 161 | |
| 162 | func (qc *AQLQueryContext) processShard(memStore memstore.MemStore, shardID int, previousBatchExecutor BatchExecutor) BatchExecutor { |
| 163 | var liveRecordsProcessed, archiveRecordsProcessed, liveBatchProcessed, archiveBatchProcessed, liveBytesTransferred, archiveBytesTransferred int |
| 164 | shard, err := memStore.GetTableShard(qc.Query.Table, shardID) |
| 165 | if err != nil { |
| 166 | qc.Error = utils.StackError(err, "failed to get shard %d for table %s", |
| 167 | shardID, qc.Query.Table) |
| 168 | return previousBatchExecutor |
| 169 | } |
| 170 | defer shard.Users.Done() |
| 171 | |
| 172 | var archiveStore *memstore.ArchiveStoreVersion |
| 173 | var cutoff uint32 |
| 174 | if shard.Schema.Schema.IsFactTable { |
| 175 | archiveStore = shard.ArchiveStore.GetCurrentVersion() |
| 176 | defer archiveStore.Users.Done() |
| 177 | cutoff = archiveStore.ArchivingCutoff |
| 178 | } |
| 179 | |
| 180 | // Process live batches. |
| 181 | if qc.toTime == nil || cutoff < uint32(qc.toTime.Time.Unix()) { |
| 182 | batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs() |
| 183 | for i, batchID := range batchIDs { |
| 184 | if qc.OOPK.done { |
| 185 | break |
| 186 | } |
| 187 | batch := shard.LiveStore.GetBatchForRead(batchID) |
| 188 | if batch == nil { |
| 189 | continue |
| 190 | } |
| 191 | |
| 192 | // For now, dimension table does not persist min and max therefore |
| 193 | // we can only skip live batch for fact table. |
| 194 | // TODO: Persist min/max/numTrues when snapshotting. |
| 195 | if shard.Schema.Schema.IsFactTable && qc.shouldSkipLiveBatch(batch) { |
| 196 | batch.RUnlock() |
| 197 | qc.OOPK.LiveBatchStats.NumBatchSkipped++ |
| 198 | continue |
| 199 | } |
| 200 | |
| 201 | liveBatchProcessed++ |
| 202 | size := batch.Capacity |
| 203 | if i == len(batchIDs)-1 { |
| 204 | size = numRecordsInLastBatch |
| 205 | } |
| 206 | liveRecordsProcessed += size |
| 207 | previousBatchExecutor = qc.processBatch(&batch.Batch, |
| 208 | batchID, |
| 209 | size, |
| 210 | qc.transferLiveBatch(batch, size), |
| 211 | qc.liveBatchCustomFilterExecutor(cutoff), previousBatchExecutor, true) |
| 212 | qc.cudaStreams[0], qc.cudaStreams[1] = qc.cudaStreams[1], qc.cudaStreams[0] |
| 213 | liveBytesTransferred += qc.OOPK.currentBatch.stats.bytesTransferred |
| 214 | } |
| 215 | } |
| 216 | |
| 217 | // Process archive batches. |
| 218 | if archiveStore != nil && (qc.fromTime == nil || cutoff > uint32(qc.fromTime.Time.Unix())) { |
| 219 | scanner := qc.TableScanners[0] |
no test coverage detected