MCPcopy
hub / github.com/redpanda-data/console / fetchMessages

Method fetchMessages

backend/pkg/console/list_messages.go:828–932  ·  view source on GitHub ↗

FetchMessages is in charge of fulfilling the topic consume request. This is tricky in many cases, often due to the fact that we can't consume backwards, but we offer users to consume the most recent messages.

(ctx context.Context, cl *kgo.Client, progress IListMessagesProgress, consumeReq TopicConsumeRequest)

Source from the content-addressed store, hash-verified

826// in many cases, often due to the fact that we can't consume backwards, but we offer
827// users to consume the most recent messages.
828func (s *Service) fetchMessages(ctx context.Context, cl *kgo.Client, progress IListMessagesProgress, consumeReq TopicConsumeRequest) error {
829 // 1. Assign partitions with right start offsets and create client
830 partitionOffsets := make(map[string]map[int32]kgo.Offset)
831 partitionOffsets[consumeReq.TopicName] = make(map[int32]kgo.Offset)
832 for _, req := range consumeReq.Partitions {
833 offset := kgo.NewOffset().At(req.StartOffset)
834 partitionOffsets[consumeReq.TopicName][req.PartitionID] = offset
835 }
836
837 opts := append(cl.Opts(), kgo.ConsumePartitions(partitionOffsets))
838 client, err := kgo.NewClient(opts...)
839 if err != nil {
840 return fmt.Errorf("failed to create new kafka client: %w", err)
841 }
842 defer client.Close()
843
844 // 2. Create consumer workers
845 // Reduced from 100 to 20 to limit memory usage in serverless environments
846 // With large records (up to 1MB), 100 records = 1GB+ after deserialization
847 jobs := make(chan *kgo.Record, 20)
848 resultsCh := make(chan *TopicMessage, 20)
849 workerCtx, cancel := context.WithCancelCause(ctx)
850 defer cancel(errors.New("worker cancel"))
851
852 wg := sync.WaitGroup{}
853
854 // If we use more than one worker the order of messages in each partition gets lost. Hence we only use it where
855 // multiple workers are actually beneficial - for potentially high throughput stream requests.
856 workerCount := 1
857 if consumeReq.FilterInterpreterCode != "" {
858 workerCount = 6
859 }
860 for i := 0; i < workerCount; i++ {
861 // Setup JavaScript interpreter
862 isMessageOK, err := s.setupInterpreter(consumeReq.FilterInterpreterCode)
863 if err != nil {
864 s.logger.ErrorContext(ctx, "failed to setup interpreter", slog.Any("error", err))
865 progress.OnError(fmt.Sprintf("failed to setup interpreter: %v", err.Error()))
866 return err
867 }
868
869 wg.Add(1)
870 go s.startMessageWorker(workerCtx, &wg, isMessageOK, jobs, resultsCh,
871 consumeReq)
872 }
873 // Close the results channel once all workers have finished processing jobs and therefore no senders are left anymore
874 go func() {
875 wg.Wait()
876 close(resultsCh)
877 }()
878
879 // 3. Start go routine that consumes messages from Kafka and produces these records on the jobs channel so that these
880 // can be decoded by our workers.
881 go s.consumeKafkaMessages(workerCtx, client, consumeReq, jobs)
882
883 // 4. Receive decoded messages until our request is satisfied. Once that's the case we will cancel the context
884 // that propagate to all the launched go routines.
885 messageCount := 0

Callers 1

ListMessagesMethod · 0.95

Calls 9

setupInterpreterMethod · 0.95
startMessageWorkerMethod · 0.95
consumeKafkaMessagesMethod · 0.95
closeFunction · 0.85
OnErrorMethod · 0.65
OnMessageConsumedMethod · 0.65
OnMessageMethod · 0.65
cancelFunction · 0.50
ErrorMethod · 0.45

Tested by

no test coverage detected