CreateTopic creates a Kafka topic.
(ctx context.Context, createTopicReq kmsg.CreateTopicsRequestTopic)
| 36 | |
| 37 | // CreateTopic creates a Kafka topic. |
| 38 | func (s *Service) CreateTopic(ctx context.Context, createTopicReq kmsg.CreateTopicsRequestTopic) (CreateTopicResponse, *rest.Error) { |
| 39 | cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx) |
| 40 | if err != nil { |
| 41 | return CreateTopicResponse{}, errorToRestError(err) |
| 42 | } |
| 43 | |
| 44 | internalLogs := []slog.Attr{ |
| 45 | slog.String("topic_name", createTopicReq.Topic), |
| 46 | slog.Int("partition_count", int(createTopicReq.NumPartitions)), |
| 47 | slog.Int("replication_factor", int(createTopicReq.ReplicationFactor)), |
| 48 | slog.Int("configuration_count", len(createTopicReq.Configs)), |
| 49 | } |
| 50 | |
| 51 | req := kmsg.NewCreateTopicsRequest() |
| 52 | req.Topics = []kmsg.CreateTopicsRequestTopic{createTopicReq} |
| 53 | |
| 54 | createRes, err := req.RequestWith(ctx, cl) |
| 55 | if err != nil { |
| 56 | return CreateTopicResponse{}, &rest.Error{ |
| 57 | Err: fmt.Errorf("failed to create topic: %w", err), |
| 58 | Status: http.StatusServiceUnavailable, |
| 59 | Message: fmt.Sprintf("Failed to create topic: %v", err.Error()), |
| 60 | InternalLogs: internalLogs, |
| 61 | IsSilent: false, |
| 62 | } |
| 63 | } |
| 64 | |
| 65 | if len(createRes.Topics) != 1 { |
| 66 | return CreateTopicResponse{}, &rest.Error{ |
| 67 | Err: fmt.Errorf("unexpected number of topic responses, expected exactly one but got '%v'", len(createRes.Topics)), |
| 68 | Status: http.StatusInternalServerError, |
| 69 | Message: fmt.Sprintf("unexpected number of topic responses, expected exactly one but got '%v'", len(createRes.Topics)), |
| 70 | InternalLogs: internalLogs, |
| 71 | IsSilent: false, |
| 72 | } |
| 73 | } |
| 74 | |
| 75 | createTopicRes := createRes.Topics[0] |
| 76 | kafkaErr := newKafkaErrorWithDynamicMessage(createTopicRes.ErrorCode, createTopicRes.ErrorMessage) |
| 77 | if kafkaErr != nil { |
| 78 | return CreateTopicResponse{}, &rest.Error{ |
| 79 | Err: fmt.Errorf("failed to create topic, inner kafka error: %w", kafkaErr), |
| 80 | Status: http.StatusServiceUnavailable, |
| 81 | Message: fmt.Sprintf("Failed to create topic, kafka responded with the following error: %v", kafkaErr.Error()), |
| 82 | InternalLogs: internalLogs, |
| 83 | IsSilent: false, |
| 84 | } |
| 85 | } |
| 86 | |
| 87 | configs := make([]CreateTopicResponseConfig, len(createTopicRes.Configs)) |
| 88 | for i, cfg := range createTopicRes.Configs { |
| 89 | configs[i] = CreateTopicResponseConfig{ |
| 90 | Name: cfg.Name, |
| 91 | Value: cfg.Value, |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | return CreateTopicResponse{ |
nothing calls this directly
no test coverage detected