GetTopicsOverview returns a TopicSummary for all Kafka Topics nolint:gocognit // This function is complex by nature as it has to fetch multiple information from Kafka
(ctx context.Context)
| 50 | // |
| 51 | //nolint:gocognit // This function is complex by nature as it has to fetch multiple information from Kafka |
| 52 | func (s *Service) GetTopicsOverview(ctx context.Context) ([]*TopicSummary, error) { |
| 53 | _, adminCl, err := s.kafkaClientFactory.GetKafkaClient(ctx) |
| 54 | if err != nil { |
| 55 | return nil, err |
| 56 | } |
| 57 | |
| 58 | // 1. Request metadata |
| 59 | metadata, err := adminCl.Metadata(ctx) |
| 60 | if err != nil { |
| 61 | return nil, err |
| 62 | } |
| 63 | |
| 64 | // 2. Extract all topicNames from metadata |
| 65 | topicNames := make([]string, 0, len(metadata.Topics)) |
| 66 | for _, topic := range metadata.Topics { |
| 67 | topicName := topic.Topic |
| 68 | if topic.Err != nil { |
| 69 | s.logger.ErrorContext(ctx, "failed to get topic metadata while listing topics", |
| 70 | slog.String("topic_name", topicName), |
| 71 | slog.Any("error", topic.Err)) |
| 72 | return nil, topic.Err |
| 73 | } |
| 74 | |
| 75 | topicNames = append(topicNames, topicName) |
| 76 | } |
| 77 | |
| 78 | // 3. Get log dir sizes & configs for each topic concurrently |
| 79 | // Use a shorter ctx timeout so that we don't wait for too long if one broker is currently down. |
| 80 | childCtx, cancel := context.WithTimeout(ctx, 5*time.Second) |
| 81 | defer cancel() |
| 82 | |
| 83 | configs := make(map[string]*TopicConfig) |
| 84 | var logDirsByTopic map[string]TopicLogDirSummary |
| 85 | var logDirErrorMsg string |
| 86 | wg := sync.WaitGroup{} |
| 87 | wg.Go(func() { |
| 88 | configs, err = s.GetTopicsConfigs(childCtx, topicNames, []string{"cleanup.policy"}) |
| 89 | if err != nil { |
| 90 | s.logger.Warn("failed to fetch topic configs to return cleanup.policy", slog.Any("error", err)) |
| 91 | } |
| 92 | }) |
| 93 | wg.Go(func() { |
| 94 | logDirs, err := s.logDirsByTopic(childCtx) |
| 95 | if err == nil { |
| 96 | logDirsByTopic = logDirs |
| 97 | } else { |
| 98 | s.logger.Warn("failed to retrieve log dirs by topic", slog.Any("error", err)) |
| 99 | logDirErrorMsg = err.Error() |
| 100 | } |
| 101 | }) |
| 102 | wg.Wait() |
| 103 | |
| 104 | // 4. Merge information from all requests and construct the TopicSummary object |
| 105 | res := make([]*TopicSummary, 0, len(topicNames)) |
| 106 | for _, topic := range metadata.Topics { |
| 107 | policy := "N/A" |
| 108 | topicName := topic.Topic |
| 109 | if configs != nil { |
nothing calls this directly
no test coverage detected