DeleteTopic deletes a Kafka Topic (if possible and not disabled).
(ctx context.Context, topicName string)
| 22 | |
| 23 | // DeleteTopic deletes a Kafka Topic (if possible and not disabled). |
| 24 | func (s *Service) DeleteTopic(ctx context.Context, topicName string) *rest.Error { |
| 25 | cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx) |
| 26 | if err != nil { |
| 27 | return errorToRestError(err) |
| 28 | } |
| 29 | |
| 30 | req := kmsg.NewDeleteTopicsRequest() |
| 31 | req.TopicNames = []string{topicName} |
| 32 | req.Topics = []kmsg.DeleteTopicsRequestTopic{ |
| 33 | { |
| 34 | Topic: new(topicName), |
| 35 | }, |
| 36 | } |
| 37 | req.TimeoutMillis = 30 * 1000 // 30s |
| 38 | |
| 39 | res, err := req.RequestWith(ctx, cl) |
| 40 | if err != nil { |
| 41 | return &rest.Error{ |
| 42 | Err: err, |
| 43 | Status: http.StatusServiceUnavailable, |
| 44 | Message: fmt.Sprintf("Failed to execute delete topic command: %v", err.Error()), |
| 45 | InternalLogs: []slog.Attr{slog.String("topic_name", topicName)}, |
| 46 | IsSilent: false, |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | if len(res.Topics) != 1 { |
| 51 | return &rest.Error{ |
| 52 | Err: errors.New("topics array in response is empty"), |
| 53 | Status: http.StatusServiceUnavailable, |
| 54 | Message: "Unexpected Kafka response: No topics set in the response", |
| 55 | InternalLogs: []slog.Attr{slog.String("topic_name", topicName)}, |
| 56 | IsSilent: false, |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | topicRes := res.Topics[0] |
| 61 | if err := newKafkaErrorWithDynamicMessage(topicRes.ErrorCode, topicRes.ErrorMessage); err != nil { |
| 62 | return &rest.Error{ |
| 63 | Err: err, |
| 64 | Status: http.StatusServiceUnavailable, |
| 65 | Message: fmt.Sprintf("Failed to delete Kafka topic: %v", err.Error()), |
| 66 | InternalLogs: []slog.Attr{slog.String("topic_name", topicName)}, |
| 67 | IsSilent: false, |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | return nil |
| 72 | } |
| 73 | |
| 74 | // DeleteTopics proxies the Kafka request/response between the Console service and Kafka. |
| 75 | func (s *Service) DeleteTopics(ctx context.Context, req *kmsg.DeleteTopicsRequest) (*kmsg.DeleteTopicsResponse, error) { |
nothing calls this directly
no test coverage detected