MCPcopy
hub / github.com/livekit/livekit / TestConcurrentConsumption

Function TestConcurrentConsumption

pkg/utils/incrementaldispatcher_test.go:50–97  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

48}
49
50func TestConcurrentConsumption(t *testing.T) {
51 producer := utils.NewIncrementalDispatcher[int]()
52 numConsumers := 100
53 sums := make([]atomic.Int32, numConsumers)
54 var wg sync.WaitGroup
55
56 for i := range numConsumers {
57 wg.Add(1)
58 i := i
59 go func() {
60 defer wg.Done()
61 producer.ForEach(func(item int) {
62 sums[i].Add(int32(item))
63 })
64 }()
65 }
66
67 // Add items
68 expectedSum := 0
69 for i := range 20 {
70 expectedSum += i
71 producer.Add(i)
72 }
73
74 for i := range numConsumers {
75 testutils.WithTimeout(t, func() string {
76 if sums[i].Load() != int32(expectedSum) {
77 return fmt.Sprintf("consumer %d did not consume all the items. expected %d, actual: %d",
78 i, expectedSum, sums[i].Load())
79 }
80 return ""
81 }, time.Second)
82 }
83
84 // keep adding and ensure it's consumed
85 for i := 20; i < 30; i++ {
86 expectedSum += i
87 producer.Add(i)
88 }
89
90 // wait for all consumers to finish
91 producer.Done()
92 wg.Wait()
93
94 for i := range numConsumers {
95 require.Equal(t, int32(expectedSum), sums[i].Load(), "consumer %d did not match", i)
96 }
97}

Callers

nothing calls this directly

Calls 5

WithTimeoutFunction · 0.92
DoneMethod · 0.80
ForEachMethod · 0.80
LoadMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected