| 114 | } |
| 115 | |
| 116 | func (q *QuickReorderQueue[T]) QueueItem(sessionId string, seqNum int, data T) error { |
| 117 | q.lock.Lock() |
| 118 | defer q.lock.Unlock() |
| 119 | |
| 120 | if q.closed { |
| 121 | return fmt.Errorf("ReorderQueue is closed, cannot queue new item") |
| 122 | } |
| 123 | |
| 124 | if len(q.buffer)+len(q.outCh) >= cap(q.outCh) { |
| 125 | return fmt.Errorf("queue is full, cannot accept new items, cap: %d", cap(q.outCh)) |
| 126 | } |
| 127 | |
| 128 | q.ensureSessionTs_withlock(sessionId) |
| 129 | |
| 130 | cmp := q.cmpSessionSeq_withlock(sessionId, seqNum, q.currentSessionId, q.nextSeqNum) |
| 131 | |
| 132 | if cmp < 0 || seqNum == 0 || sessionId == "" { |
| 133 | q.outCh <- data |
| 134 | return nil |
| 135 | } |
| 136 | |
| 137 | if cmp == 0 { |
| 138 | q.outCh <- data |
| 139 | q.nextSeqNum++ |
| 140 | q.processBuffer_withlock() |
| 141 | return nil |
| 142 | } |
| 143 | |
| 144 | q.buffer = append(q.buffer, queuedItem[T]{ |
| 145 | sessionId: sessionId, |
| 146 | seqNum: seqNum, |
| 147 | data: data, |
| 148 | timestamp: time.Now(), |
| 149 | }) |
| 150 | q.ensureTimer_withlock() |
| 151 | return nil |
| 152 | } |
| 153 | |
| 154 | func (q *QuickReorderQueue[T]) processBuffer_withlock() { |
| 155 | if len(q.buffer) == 0 { |