( getPending func() *[]taggedPacket, deliver func(taggedPacket), )
| 201 | } |
| 202 | |
| 203 | func (dp *DeliveryPipe) deliveryLoop( |
| 204 | getPending func() *[]taggedPacket, |
| 205 | deliver func(taggedPacket), |
| 206 | ) { |
| 207 | for { |
| 208 | dp.lock.Lock() |
| 209 | if dp.closed { |
| 210 | dp.lock.Unlock() |
| 211 | return |
| 212 | } |
| 213 | |
| 214 | pending := getPending() |
| 215 | now := time.Now() |
| 216 | |
| 217 | // Find all packets ready for delivery (deliveryTime <= now) |
| 218 | readyCount := 0 |
| 219 | for _, pkt := range *pending { |
| 220 | if pkt.deliveryTime.After(now) { |
| 221 | break |
| 222 | } |
| 223 | readyCount++ |
| 224 | } |
| 225 | |
| 226 | // Extract ready packets |
| 227 | ready := make([]taggedPacket, readyCount) |
| 228 | copy(ready, (*pending)[:readyCount]) |
| 229 | *pending = (*pending)[readyCount:] |
| 230 | |
| 231 | dp.lock.Unlock() |
| 232 | |
| 233 | // Deliver all ready packets (outside lock) |
| 234 | for _, pkt := range ready { |
| 235 | deliver(pkt) |
| 236 | } |
| 237 | |
| 238 | // Always sleep 1ms - simple busy loop |
| 239 | time.Sleep(1 * time.Millisecond) |
| 240 | } |
| 241 | } |
| 242 | |
| 243 | func (dp *DeliveryPipe) Close() { |
| 244 | dp.lock.Lock() |
no outgoing calls
no test coverage detected