| 87 | } |
| 88 | |
| 89 | func (t *TimeQueue[T]) Dequeue(ctx context.Context) T { |
| 90 | t.dequeueMu.Lock() |
| 91 | defer t.dequeueMu.Unlock() |
| 92 | |
| 93 | notify := make(chan struct{}, 1) |
| 94 | t.notify.Store(¬ify) |
| 95 | defer t.notify.Store(nil) |
| 96 | |
| 97 | for { |
| 98 | t.mu.Lock() |
| 99 | var wait time.Duration |
| 100 | if t.heap.Len() > 0 { |
| 101 | val := t.heap.Peek() |
| 102 | wait = time.Until(val.at) |
| 103 | if wait <= 0 { |
| 104 | defer t.mu.Unlock() |
| 105 | return heap.Pop(&t.heap).(timeQueueEntry[T]).v |
| 106 | } |
| 107 | } |
| 108 | if wait == 0 || wait > 3*time.Minute { |
| 109 | wait = 3 * time.Minute |
| 110 | } |
| 111 | t.mu.Unlock() |
| 112 | |
| 113 | timer := time.NewTimer(wait) |
| 114 | |
| 115 | select { |
| 116 | case <-timer.C: |
| 117 | t.mu.Lock() |
| 118 | if len(t.heap) == 0 { |
| 119 | t.mu.Unlock() |
| 120 | continue |
| 121 | } |
| 122 | val := t.heap.Peek() |
| 123 | if val.at.After(time.Now()) { |
| 124 | t.mu.Unlock() |
| 125 | continue |
| 126 | } |
| 127 | heap.Pop(&t.heap) |
| 128 | t.mu.Unlock() |
| 129 | return val.v |
| 130 | case <-notify: // new task was added, loop again to ensure we have the earliest task. |
| 131 | if !timer.Stop() { |
| 132 | <-timer.C |
| 133 | } |
| 134 | continue |
| 135 | case <-ctx.Done(): |
| 136 | if !timer.Stop() { |
| 137 | <-timer.C |
| 138 | } |
| 139 | var zero T |
| 140 | return zero |
| 141 | } |
| 142 | } |
| 143 | } |
| 144 | |
| 145 | type timeQueueEntry[T any] struct { |
| 146 | at time.Time |