MCPcopy
hub / github.com/redpanda-data/console / CreateTopic

Method CreateTopic

backend/pkg/console/create_topic.go:38–101  ·  view source on GitHub ↗

CreateTopic creates a Kafka topic.

(ctx context.Context, createTopicReq kmsg.CreateTopicsRequestTopic)

Source from the content-addressed store, hash-verified

36
37// CreateTopic creates a Kafka topic.
38func (s *Service) CreateTopic(ctx context.Context, createTopicReq kmsg.CreateTopicsRequestTopic) (CreateTopicResponse, *rest.Error) {
39 cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx)
40 if err != nil {
41 return CreateTopicResponse{}, errorToRestError(err)
42 }
43
44 internalLogs := []slog.Attr{
45 slog.String("topic_name", createTopicReq.Topic),
46 slog.Int("partition_count", int(createTopicReq.NumPartitions)),
47 slog.Int("replication_factor", int(createTopicReq.ReplicationFactor)),
48 slog.Int("configuration_count", len(createTopicReq.Configs)),
49 }
50
51 req := kmsg.NewCreateTopicsRequest()
52 req.Topics = []kmsg.CreateTopicsRequestTopic{createTopicReq}
53
54 createRes, err := req.RequestWith(ctx, cl)
55 if err != nil {
56 return CreateTopicResponse{}, &rest.Error{
57 Err: fmt.Errorf("failed to create topic: %w", err),
58 Status: http.StatusServiceUnavailable,
59 Message: fmt.Sprintf("Failed to create topic: %v", err.Error()),
60 InternalLogs: internalLogs,
61 IsSilent: false,
62 }
63 }
64
65 if len(createRes.Topics) != 1 {
66 return CreateTopicResponse{}, &rest.Error{
67 Err: fmt.Errorf("unexpected number of topic responses, expected exactly one but got '%v'", len(createRes.Topics)),
68 Status: http.StatusInternalServerError,
69 Message: fmt.Sprintf("unexpected number of topic responses, expected exactly one but got '%v'", len(createRes.Topics)),
70 InternalLogs: internalLogs,
71 IsSilent: false,
72 }
73 }
74
75 createTopicRes := createRes.Topics[0]
76 kafkaErr := newKafkaErrorWithDynamicMessage(createTopicRes.ErrorCode, createTopicRes.ErrorMessage)
77 if kafkaErr != nil {
78 return CreateTopicResponse{}, &rest.Error{
79 Err: fmt.Errorf("failed to create topic, inner kafka error: %w", kafkaErr),
80 Status: http.StatusServiceUnavailable,
81 Message: fmt.Sprintf("Failed to create topic, kafka responded with the following error: %v", kafkaErr.Error()),
82 InternalLogs: internalLogs,
83 IsSilent: false,
84 }
85 }
86
87 configs := make([]CreateTopicResponseConfig, len(createTopicRes.Configs))
88 for i, cfg := range createTopicRes.Configs {
89 configs[i] = CreateTopicResponseConfig{
90 Name: cfg.Name,
91 Value: cfg.Value,
92 }
93 }
94
95 return CreateTopicResponse{

Callers

nothing calls this directly

Calls 5

errorToRestErrorFunction · 0.85
GetKafkaClientMethod · 0.65
StringMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected