FetchMessages is in charge of fulfilling the topic consume request. This is tricky in many cases, often due to the fact that we can't consume backwards, but we offer users to consume the most recent messages.
(ctx context.Context, cl *kgo.Client, progress IListMessagesProgress, consumeReq TopicConsumeRequest)
| 826 | // in many cases, often due to the fact that we can't consume backwards, but we offer |
| 827 | // users to consume the most recent messages. |
| 828 | func (s *Service) fetchMessages(ctx context.Context, cl *kgo.Client, progress IListMessagesProgress, consumeReq TopicConsumeRequest) error { |
| 829 | // 1. Assign partitions with right start offsets and create client |
| 830 | partitionOffsets := make(map[string]map[int32]kgo.Offset) |
| 831 | partitionOffsets[consumeReq.TopicName] = make(map[int32]kgo.Offset) |
| 832 | for _, req := range consumeReq.Partitions { |
| 833 | offset := kgo.NewOffset().At(req.StartOffset) |
| 834 | partitionOffsets[consumeReq.TopicName][req.PartitionID] = offset |
| 835 | } |
| 836 | |
| 837 | opts := append(cl.Opts(), kgo.ConsumePartitions(partitionOffsets)) |
| 838 | client, err := kgo.NewClient(opts...) |
| 839 | if err != nil { |
| 840 | return fmt.Errorf("failed to create new kafka client: %w", err) |
| 841 | } |
| 842 | defer client.Close() |
| 843 | |
| 844 | // 2. Create consumer workers |
| 845 | // Reduced from 100 to 20 to limit memory usage in serverless environments |
| 846 | // With large records (up to 1MB), 100 records = 1GB+ after deserialization |
| 847 | jobs := make(chan *kgo.Record, 20) |
| 848 | resultsCh := make(chan *TopicMessage, 20) |
| 849 | workerCtx, cancel := context.WithCancelCause(ctx) |
| 850 | defer cancel(errors.New("worker cancel")) |
| 851 | |
| 852 | wg := sync.WaitGroup{} |
| 853 | |
| 854 | // If we use more than one worker the order of messages in each partition gets lost. Hence we only use it where |
| 855 | // multiple workers are actually beneficial - for potentially high throughput stream requests. |
| 856 | workerCount := 1 |
| 857 | if consumeReq.FilterInterpreterCode != "" { |
| 858 | workerCount = 6 |
| 859 | } |
| 860 | for i := 0; i < workerCount; i++ { |
| 861 | // Setup JavaScript interpreter |
| 862 | isMessageOK, err := s.setupInterpreter(consumeReq.FilterInterpreterCode) |
| 863 | if err != nil { |
| 864 | s.logger.ErrorContext(ctx, "failed to setup interpreter", slog.Any("error", err)) |
| 865 | progress.OnError(fmt.Sprintf("failed to setup interpreter: %v", err.Error())) |
| 866 | return err |
| 867 | } |
| 868 | |
| 869 | wg.Add(1) |
| 870 | go s.startMessageWorker(workerCtx, &wg, isMessageOK, jobs, resultsCh, |
| 871 | consumeReq) |
| 872 | } |
| 873 | // Close the results channel once all workers have finished processing jobs and therefore no senders are left anymore |
| 874 | go func() { |
| 875 | wg.Wait() |
| 876 | close(resultsCh) |
| 877 | }() |
| 878 | |
| 879 | // 3. Start go routine that consumes messages from Kafka and produces these records on the jobs channel so that these |
| 880 | // can be decoded by our workers. |
| 881 | go s.consumeKafkaMessages(workerCtx, client, consumeReq, jobs) |
| 882 | |
| 883 | // 4. Receive decoded messages until our request is satisfied. Once that's the case we will cancel the context |
| 884 | // that propagate to all the launched go routines. |
| 885 | messageCount := 0 |
no test coverage detected