MCPcopy
hub / github.com/redpanda-data/console / logDirsByTopic

Method logDirsByTopic

backend/pkg/console/log_dir_topic.go:139–296  ·  view source on GitHub ↗

logDirsByTopic returns a map where the topic name is the key. It returns the log dir size for each topic. nolint:gocognit,cyclop // Complexity is indeed high, but ideally this will be solved by changing the API response

(ctx context.Context)

Source from the content-addressed store, hash-verified

137//
138//nolint:gocognit,cyclop // Complexity is indeed high, but ideally this will be solved by changing the API response
139func (s *Service) logDirsByTopic(ctx context.Context) (map[string]TopicLogDirSummary, error) {
140 _, adminCl, err := s.kafkaClientFactory.GetKafkaClient(ctx)
141 if err != nil {
142 return nil, err
143 }
144
145 // 1. Retrieve metadata to know brokers hosting each replica.
146 metadata, err := adminCl.Metadata(ctx)
147 if err != nil {
148 return nil, fmt.Errorf("failed to retrieve metadata: %w", err)
149 }
150
151 // 2. Request log dirs from all brokers and deduplicate shared log dirs.
152 shardErrors := make(map[int32]kadm.ShardError)
153 describedLogDirs, err := adminCl.DescribeAllLogDirs(ctx, nil)
154 if err != nil {
155 var se *kadm.ShardErrors
156 if !errors.As(err, &se) {
157 return nil, fmt.Errorf("failed to describe log dirs: %w", err)
158 }
159
160 if se.AllFailed {
161 return nil, fmt.Errorf("failed to describe all log dirs: %w", err)
162 }
163 s.logger.WarnContext(ctx, "failed to describe log dirs from some shards", slog.Int("failed_shards", len(se.Errs)))
164 for _, shardErr := range se.Errs {
165 s.logger.WarnContext(ctx, "shard error for describing log dirs",
166 slog.Int("broker_id", int(shardErr.Broker.NodeID)),
167 slog.Any("error", shardErr.Err))
168 shardErrors[shardErr.Broker.NodeID] = shardErr
169 }
170 }
171
172 // 3. Because the described log dirs response may not report the size for every
173 // individual partition we iterate by the reported metadata. To do so, we first
174 // gather all the reported replica ids for each partition.
175 topicsSet := topicsSetWithLogDirs{}
176 metadata.Topics.EachPartition(func(detail kadm.PartitionDetail) {
177 topicsSet.Set(detail.Topic, partitionInfo{
178 Topic: detail.Topic,
179 PartitionID: detail.Partition,
180 Replicas: detail.Replicas,
181 // ReplicaLogDirs will be filled later when iterating through all
182 // broker log dirs. Each Replica must have at least one reported
183 // log dir, otherwise we must assume there was an issue for this
184 // broker - log dir combination.
185 ReplicaLogDirs: make(map[int32][]kadm.DescribedLogDirPartition),
186 ReplicaLogDirErrors: make(map[int32][]kadm.DescribedLogDir),
187 OfflineReplicas: detail.OfflineReplicas,
188 InSyncReplicas: detail.ISR,
189 Leader: detail.Leader,
190 })
191 })
192
193 // 4. Update partition info with reported log dirs and store log dir errors.
194 erroredLogDirsByBrokerID := make(map[int32][]kadm.DescribedLogDir)
195 describedLogDirs.Each(func(dir kadm.DescribedLogDir) {
196 if dir.Err != nil {

Callers 2

TestLogDirsByTopicFunction · 0.95
GetTopicsOverviewMethod · 0.95

Calls 6

EachPartitionMethod · 0.95
SetMethod · 0.95
LookupMethod · 0.95
SizeMethod · 0.80
GetKafkaClientMethod · 0.65
ErrorMethod · 0.45

Tested by 1

TestLogDirsByTopicFunction · 0.76