| 89 | } |
| 90 | |
| 91 | func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T { |
| 92 | t.tqueue.mu.Lock() |
| 93 | for { |
| 94 | for t.tqueue.heap.Len() > 0 { |
| 95 | thead := t.tqueue.heap.Peek() // peek at the head of the time queue |
| 96 | if thead.at.Before(time.Now()) { |
| 97 | tqe := heap.Pop(&t.tqueue.heap).(timeQueueEntry[priorityEntry[T]]) |
| 98 | heap.Push(&t.ready, tqe.v) |
| 99 | } else { |
| 100 | break |
| 101 | } |
| 102 | } |
| 103 | if t.ready.Len() > 0 { |
| 104 | defer t.tqueue.mu.Unlock() |
| 105 | return heap.Pop(&t.ready).(priorityEntry[T]).v |
| 106 | } |
| 107 | t.tqueue.mu.Unlock() |
| 108 | // wait for the next element to be ready |
| 109 | val := t.tqueue.Dequeue(ctx) |
| 110 | t.tqueue.mu.Lock() |
| 111 | heap.Push(&t.ready, val) |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | type priorityEntry[T equals[T]] struct { |
| 116 | at time.Time |