(batchIDs map[int]string)
| 70 | } |
| 71 | |
| 72 | func (batchStats *BatchStatsReporter) reportBatchStat(batchIDs map[int]string) { |
| 73 | timer := utils.GetRootReporter().GetTimer(utils.BatchSizeReportTime).Start() |
| 74 | defer timer.Stop() |
| 75 | |
| 76 | tables := batchStats.memStore.GetSchemas() |
| 77 | |
| 78 | for table, schema := range tables { |
| 79 | shards := batchStats.shardOwner.GetOwnedShards() |
| 80 | for _, shardID := range shards { |
| 81 | shard, err := batchStats.memStore.GetTableShard(table, shardID) |
| 82 | if err != nil || shard == nil { |
| 83 | continue |
| 84 | } |
| 85 | |
| 86 | for batchID, name := range batchIDs { |
| 87 | if batchID < 0 { |
| 88 | totalSize := 0 |
| 89 | liveBatchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs() |
| 90 | for i, liveBatchID := range liveBatchIDs { |
| 91 | batch := shard.LiveStore.GetBatchForRead(liveBatchID) |
| 92 | if batch == nil { |
| 93 | continue |
| 94 | } |
| 95 | size := batch.Capacity |
| 96 | batch.RUnlock() |
| 97 | if i == len(batchIDs)-1 { |
| 98 | size = numRecordsInLastBatch |
| 99 | } |
| 100 | totalSize += size |
| 101 | } |
| 102 | utils.GetReporter(table, shardID).GetChildGauge(map[string]string{"time": name}, utils.BatchSize).Update(float64(totalSize)) |
| 103 | } else { |
| 104 | if !schema.Schema.IsFactTable { |
| 105 | continue |
| 106 | } |
| 107 | version := shard.ArchiveStore.GetCurrentVersion() |
| 108 | batch := version.RequestBatch(int32(batchID)) |
| 109 | size := batch.Size |
| 110 | version.Users.Done() |
| 111 | utils.GetReporter(table, shardID).GetChildGauge(map[string]string{"time": name}, utils.BatchSize).Update(float64(size)) |
| 112 | } |
| 113 | } |
| 114 | shard.Users.Done() |
| 115 | } |
| 116 | } |
| 117 | } |
no test coverage detected