TestQueueConcurrency tests the concurrency safety of Queue
(t *testing.T)
| 126 | |
| 127 | // TestQueueConcurrency tests the concurrency safety of Queue |
| 128 | func TestQueueConcurrency(t *testing.T) { |
| 129 | // Initialize a new queue |
| 130 | q := New[int](100) |
| 131 | |
| 132 | // Number of goroutines and operations |
| 133 | goroutines := 10 |
| 134 | operations := 100 |
| 135 | |
| 136 | // Wait group to synchronize goroutines |
| 137 | wg := sync.WaitGroup{} |
| 138 | wg.Add(goroutines * 2) // For both producers and consumers |
| 139 | |
| 140 | // Start producer goroutines |
| 141 | for i := 0; i < goroutines; i++ { |
| 142 | go func(id int) { |
| 143 | defer wg.Done() |
| 144 | for j := 0; j < operations; j++ { |
| 145 | q.Put(id*operations + j) |
| 146 | // Small sleep to increase chance of race conditions |
| 147 | time.Sleep(time.Microsecond) |
| 148 | } |
| 149 | }(i) |
| 150 | } |
| 151 | |
| 152 | // Start consumer goroutines |
| 153 | consumed := make(chan int, goroutines*operations) |
| 154 | for i := 0; i < goroutines; i++ { |
| 155 | go func() { |
| 156 | defer wg.Done() |
| 157 | for j := 0; j < operations; j++ { |
| 158 | // Try to pop an item, but don't block if queue is empty |
| 159 | // Use a mutex to avoid race condition between Len() check and Pop() |
| 160 | q.lock.Lock() |
| 161 | if len(q.items) > 0 { |
| 162 | item := q.items[0] |
| 163 | q.items = q.items[1:] |
| 164 | q.lock.Unlock() |
| 165 | consumed <- item |
| 166 | } else { |
| 167 | q.lock.Unlock() |
| 168 | } |
| 169 | // Small sleep to increase chance of race conditions |
| 170 | time.Sleep(time.Microsecond) |
| 171 | } |
| 172 | }() |
| 173 | } |
| 174 | |
| 175 | // Wait for all goroutines to finish |
| 176 | wg.Wait() |
| 177 | // Close the consumed channel |
| 178 | close(consumed) |
| 179 | |
| 180 | // Count the number of consumed items |
| 181 | consumedCount := 0 |
| 182 | for range consumed { |
| 183 | consumedCount++ |
| 184 | } |
| 185 |