MCPcopy
hub / github.com/garethgeorge/backrest / Dequeue

Method Dequeue

internal/queue/timequeue.go:89–143  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

87}
88
89func (t *TimeQueue[T]) Dequeue(ctx context.Context) T {
90 t.dequeueMu.Lock()
91 defer t.dequeueMu.Unlock()
92
93 notify := make(chan struct{}, 1)
94 t.notify.Store(&notify)
95 defer t.notify.Store(nil)
96
97 for {
98 t.mu.Lock()
99 var wait time.Duration
100 if t.heap.Len() > 0 {
101 val := t.heap.Peek()
102 wait = time.Until(val.at)
103 if wait <= 0 {
104 defer t.mu.Unlock()
105 return heap.Pop(&t.heap).(timeQueueEntry[T]).v
106 }
107 }
108 if wait == 0 || wait > 3*time.Minute {
109 wait = 3 * time.Minute
110 }
111 t.mu.Unlock()
112
113 timer := time.NewTimer(wait)
114
115 select {
116 case <-timer.C:
117 t.mu.Lock()
118 if len(t.heap) == 0 {
119 t.mu.Unlock()
120 continue
121 }
122 val := t.heap.Peek()
123 if val.at.After(time.Now()) {
124 t.mu.Unlock()
125 continue
126 }
127 heap.Pop(&t.heap)
128 t.mu.Unlock()
129 return val.v
130 case <-notify: // new task was added, loop again to ensure we have the earliest task.
131 if !timer.Stop() {
132 <-timer.C
133 }
134 continue
135 case <-ctx.Done():
136 if !timer.Stop() {
137 <-timer.C
138 }
139 var zero T
140 return zero
141 }
142 }
143}
144
145type timeQueueEntry[T any] struct {
146 at time.Time

Callers 10

TestTPQPriorityFunction · 0.45
TestTPQStressFunction · 0.45
TestTPQRemoveFunction · 0.45
TestTPQResetFunction · 0.45
TestTimeQueueFunction · 0.45
TestFuzzTimeQueueFunction · 0.45
RunMethod · 0.45

Calls 6

LockMethod · 0.80
PopMethod · 0.80
DoneMethod · 0.80
UnlockMethod · 0.45
LenMethod · 0.45
PeekMethod · 0.45

Tested by 9

TestTPQPriorityFunction · 0.36
TestTPQStressFunction · 0.36
TestTPQRemoveFunction · 0.36
TestTPQResetFunction · 0.36
TestTimeQueueFunction · 0.36
TestFuzzTimeQueueFunction · 0.36