calculateMemoryRequirement estimate memory requirement for batch data.
(memStore memstore.MemStore)
| 967 | |
| 968 | // calculateMemoryRequirement estimate memory requirement for batch data. |
| 969 | func (qc *AQLQueryContext) calculateMemoryRequirement(memStore memstore.MemStore) int { |
| 970 | // keep track of max requirement for batch |
| 971 | maxBytesRequired := 0 |
| 972 | |
| 973 | //TODO(jians): hard code hll query memory requirement here for now, |
| 974 | //we can track memory usage |
| 975 | //based on table, dimensions, duration to do estimation |
| 976 | if qc.OOPK.IsHLL() { |
| 977 | return hllQueryRequiredMemoryInMB |
| 978 | } |
| 979 | |
| 980 | for _, shardID := range qc.TableScanners[0].Shards { |
| 981 | shard, err := memStore.GetTableShard(qc.Query.Table, shardID) |
| 982 | if err != nil { |
| 983 | qc.Error = utils.StackError(err, "failed to get shard %d for table %s", |
| 984 | shardID, qc.Query.Table) |
| 985 | return -1 |
| 986 | } |
| 987 | |
| 988 | var archiveStore *memstore.ArchiveStoreVersion |
| 989 | var cutoff uint32 |
| 990 | if shard.Schema.Schema.IsFactTable { |
| 991 | archiveStore = shard.ArchiveStore.GetCurrentVersion() |
| 992 | cutoff = archiveStore.ArchivingCutoff |
| 993 | } |
| 994 | |
| 995 | // estimate live batch memory usage |
| 996 | if qc.toTime == nil || cutoff < uint32(qc.toTime.Time.Unix()) { |
| 997 | batchIDs, _ := shard.LiveStore.GetBatchIDs() |
| 998 | |
| 999 | // find first non null batch and estimate. |
| 1000 | for _, batchID := range batchIDs { |
| 1001 | liveBatch := shard.LiveStore.GetBatchForRead(batchID) |
| 1002 | if liveBatch != nil { |
| 1003 | batchBytes := qc.estimateLiveBatchMemoryUsage(liveBatch) |
| 1004 | liveBatch.RUnlock() |
| 1005 | |
| 1006 | if batchBytes > maxBytesRequired { |
| 1007 | maxBytesRequired = batchBytes |
| 1008 | } |
| 1009 | break |
| 1010 | } |
| 1011 | } |
| 1012 | } |
| 1013 | |
| 1014 | // estimate archive batch memory usage |
| 1015 | if archiveStore != nil { |
| 1016 | if qc.fromTime == nil || cutoff > uint32(qc.fromTime.Time.Unix()) { |
| 1017 | scanner := qc.TableScanners[0] |
| 1018 | for batchID := scanner.ArchiveBatchIDStart; batchID < scanner.ArchiveBatchIDEnd; batchID++ { |
| 1019 | archiveBatch := archiveStore.RequestBatch(int32(batchID)) |
| 1020 | if archiveBatch == nil || archiveBatch.Size == 0 { |
| 1021 | continue |
| 1022 | } |
| 1023 | isFirstOrLast := batchID == scanner.ArchiveBatchIDStart || batchID == scanner.ArchiveBatchIDEnd-1 |
| 1024 | batchBytes := qc.estimateArchiveBatchMemoryUsage(archiveBatch, isFirstOrLast) |
| 1025 | if batchBytes > maxBytesRequired { |
| 1026 | maxBytesRequired = batchBytes |
no test coverage detected