MCPcopy Index your code
hub / github.com/wavetermdev/waveterm / deliveryLoop

Method deliveryLoop

cmd/test-streammanager/deliverypipe.go:203–241  ·  view source on GitHub ↗
(
	getPending func() *[]taggedPacket,
	deliver func(taggedPacket),
)

Source from the content-addressed store, hash-verified

201}
202
203func (dp *DeliveryPipe) deliveryLoop(
204 getPending func() *[]taggedPacket,
205 deliver func(taggedPacket),
206) {
207 for {
208 dp.lock.Lock()
209 if dp.closed {
210 dp.lock.Unlock()
211 return
212 }
213
214 pending := getPending()
215 now := time.Now()
216
217 // Find all packets ready for delivery (deliveryTime <= now)
218 readyCount := 0
219 for _, pkt := range *pending {
220 if pkt.deliveryTime.After(now) {
221 break
222 }
223 readyCount++
224 }
225
226 // Extract ready packets
227 ready := make([]taggedPacket, readyCount)
228 copy(ready, (*pending)[:readyCount])
229 *pending = (*pending)[readyCount:]
230
231 dp.lock.Unlock()
232
233 // Deliver all ready packets (outside lock)
234 for _, pkt := range ready {
235 deliver(pkt)
236 }
237
238 // Always sleep 1ms - simple busy loop
239 time.Sleep(1 * time.Millisecond)
240 }
241}
242
243func (dp *DeliveryPipe) Close() {
244 dp.lock.Lock()

Callers 2

dataDeliveryLoopMethod · 0.95
ackDeliveryLoopMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected