MCPcopy Index your code
hub / github.com/wavetermdev/waveterm / QueueItem

Method QueueItem

pkg/utilds/quickreorderqueue.go:116–152  ·  view source on GitHub ↗
(sessionId string, seqNum int, data T)

Source from the content-addressed store, hash-verified

114}
115
116func (q *QuickReorderQueue[T]) QueueItem(sessionId string, seqNum int, data T) error {
117 q.lock.Lock()
118 defer q.lock.Unlock()
119
120 if q.closed {
121 return fmt.Errorf("ReorderQueue is closed, cannot queue new item")
122 }
123
124 if len(q.buffer)+len(q.outCh) >= cap(q.outCh) {
125 return fmt.Errorf("queue is full, cannot accept new items, cap: %d", cap(q.outCh))
126 }
127
128 q.ensureSessionTs_withlock(sessionId)
129
130 cmp := q.cmpSessionSeq_withlock(sessionId, seqNum, q.currentSessionId, q.nextSeqNum)
131
132 if cmp < 0 || seqNum == 0 || sessionId == "" {
133 q.outCh <- data
134 return nil
135 }
136
137 if cmp == 0 {
138 q.outCh <- data
139 q.nextSeqNum++
140 q.processBuffer_withlock()
141 return nil
142 }
143
144 q.buffer = append(q.buffer, queuedItem[T]{
145 sessionId: sessionId,
146 seqNum: seqNum,
147 data: data,
148 timestamp: time.Now(),
149 })
150 q.ensureTimer_withlock()
151 return nil
152}
153
154func (q *QuickReorderQueue[T]) processBuffer_withlock() {
155 if len(q.buffer) == 0 {

Calls 4

ensureTimer_withlockMethod · 0.95