MCPcopy
hub / github.com/redpanda-data/console / GetTopicsOverview

Method GetTopicsOverview

backend/pkg/console/topic_overview.go:52–161  ·  view source on GitHub ↗

GetTopicsOverview returns a TopicSummary for all Kafka Topics nolint:gocognit // This function is complex by nature as it has to fetch multiple information from Kafka

(ctx context.Context)

Source from the content-addressed store, hash-verified

50//
51//nolint:gocognit // This function is complex by nature as it has to fetch multiple information from Kafka
52func (s *Service) GetTopicsOverview(ctx context.Context) ([]*TopicSummary, error) {
53 _, adminCl, err := s.kafkaClientFactory.GetKafkaClient(ctx)
54 if err != nil {
55 return nil, err
56 }
57
58 // 1. Request metadata
59 metadata, err := adminCl.Metadata(ctx)
60 if err != nil {
61 return nil, err
62 }
63
64 // 2. Extract all topicNames from metadata
65 topicNames := make([]string, 0, len(metadata.Topics))
66 for _, topic := range metadata.Topics {
67 topicName := topic.Topic
68 if topic.Err != nil {
69 s.logger.ErrorContext(ctx, "failed to get topic metadata while listing topics",
70 slog.String("topic_name", topicName),
71 slog.Any("error", topic.Err))
72 return nil, topic.Err
73 }
74
75 topicNames = append(topicNames, topicName)
76 }
77
78 // 3. Get log dir sizes & configs for each topic concurrently
79 // Use a shorter ctx timeout so that we don't wait for too long if one broker is currently down.
80 childCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
81 defer cancel()
82
83 configs := make(map[string]*TopicConfig)
84 var logDirsByTopic map[string]TopicLogDirSummary
85 var logDirErrorMsg string
86 wg := sync.WaitGroup{}
87 wg.Go(func() {
88 configs, err = s.GetTopicsConfigs(childCtx, topicNames, []string{"cleanup.policy"})
89 if err != nil {
90 s.logger.Warn("failed to fetch topic configs to return cleanup.policy", slog.Any("error", err))
91 }
92 })
93 wg.Go(func() {
94 logDirs, err := s.logDirsByTopic(childCtx)
95 if err == nil {
96 logDirsByTopic = logDirs
97 } else {
98 s.logger.Warn("failed to retrieve log dirs by topic", slog.Any("error", err))
99 logDirErrorMsg = err.Error()
100 }
101 })
102 wg.Wait()
103
104 // 4. Merge information from all requests and construct the TopicSummary object
105 res := make([]*TopicSummary, 0, len(topicNames))
106 for _, topic := range metadata.Topics {
107 policy := "N/A"
108 topicName := topic.Topic
109 if configs != nil {

Callers

nothing calls this directly

Calls 8

GetTopicsConfigsMethod · 0.95
logDirsByTopicMethod · 0.95
GetTopicDocumentationMethod · 0.95
GetConfigEntryByNameMethod · 0.80
GetKafkaClientMethod · 0.65
cancelFunction · 0.50
StringMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected