Drain allocates and returns an array of things Pushed in to the circular buffer. Push order is not maintained; that is, if B was Pushed after A, drain may return B at a lower index than A in the returned array.
()
| 236 | // buffer. Push order is not maintained; that is, if B was Pushed after A, |
| 237 | // drain may return B at a lower index than A in the returned array. |
| 238 | func (cb *CircularBuffer) Drain() []any { |
| 239 | cb.drainMutex.Lock() |
| 240 | |
| 241 | qs := make([]*queue, len(cb.qp)) |
| 242 | for i := 0; i < len(cb.qp); i++ { |
| 243 | qs[i] = cb.qp[i].switchQueues() |
| 244 | } |
| 245 | |
| 246 | var wg sync.WaitGroup |
| 247 | wg.Add(len(qs)) |
| 248 | for i := 0; i < len(qs); i++ { |
| 249 | go func(qi int) { |
| 250 | qs[qi].drainWait() |
| 251 | wg.Done() |
| 252 | }(i) |
| 253 | } |
| 254 | wg.Wait() |
| 255 | |
| 256 | result := make([]any, 0) |
| 257 | for i := 0; i < len(qs); i++ { |
| 258 | if acquired := atomic.LoadUint32(&qs[i].acquired); acquired < qs[i].size { |
| 259 | result = dereferenceAppend(result, qs[i].arr, 0, acquired) |
| 260 | } else { |
| 261 | result = dereferenceAppend(result, qs[i].arr, 0, qs[i].size) |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | for i := 0; i < len(qs); i++ { |
| 266 | atomic.StoreUint32(&qs[i].acquired, 0) |
| 267 | atomic.StoreUint32(&qs[i].written, 0) |
| 268 | } |
| 269 | |
| 270 | cb.drainMutex.Unlock() |
| 271 | return result |
| 272 | } |