| 2830 | func (*flushMarker) Error() string { return "keploy: capture flush marker" } |
| 2831 | |
| 2832 | func (p *Proxy) StartErrorDrain(ctx context.Context) { |
| 2833 | p.errDrainOnce.Do(func() { |
| 2834 | p.errDrainActive.Store(true) |
| 2835 | var discarded atomic.Int64 |
| 2836 | go func() { |
| 2837 | defer utils.Recover(p.logger) |
| 2838 | defer p.errDrainActive.Store(false) |
| 2839 | for { |
| 2840 | select { |
| 2841 | case <-ctx.Done(): |
| 2842 | return |
| 2843 | case err, ok := <-p.errChannel: |
| 2844 | if !ok { |
| 2845 | return |
| 2846 | } |
| 2847 | // Flush-marker rendezvous: signal that we've reached it |
| 2848 | // (all prior errors are already routed) and move on. |
| 2849 | if m, isMarker := err.(*flushMarker); isMarker { |
| 2850 | close(m.done) |
| 2851 | continue |
| 2852 | } |
| 2853 | // Route under captureMu so the window can't be swapped |
| 2854 | // between the Load and the add. The accumulator is bounded |
| 2855 | // too, so even a window left open (e.g. an error path that |
| 2856 | // skipped GetMockErrors) can't grow without limit. |
| 2857 | p.captureMu.Lock() |
| 2858 | acc := p.activeTestErrors.Load() |
| 2859 | if acc != nil { |
| 2860 | acc.addBounded(err, maxPendingMockErrors) |
| 2861 | p.captureMu.Unlock() |
| 2862 | continue |
| 2863 | } |
| 2864 | retained := isMockNotFoundErr(err) && p.pendingMockErrors.addBounded(err, maxPendingMockErrors) |
| 2865 | p.captureMu.Unlock() |
| 2866 | if !retained { |
| 2867 | // Log only the first discard and then every 100th to reduce noise. |
| 2868 | n := discarded.Add(1) |
| 2869 | if n == 1 || n%100 == 0 { |
| 2870 | p.logger.Debug("discarding mock error outside active test", |
| 2871 | zap.Error(err), zap.Int64("totalDiscarded", n)) |
| 2872 | } |
| 2873 | } |
| 2874 | } |
| 2875 | } |
| 2876 | }() |
| 2877 | }) |
| 2878 | } |
| 2879 | |
| 2880 | // BeginTestErrorCapture starts collecting mock-not-found errors for the current |
| 2881 | // test case into a fresh per-test accumulator, so the replayer's GetMockErrors |