(maxBatchSize int)
| 475 | } |
| 476 | |
| 477 | func (peer *Peer) RoutineSequentialSender(maxBatchSize int) { |
| 478 | device := peer.device |
| 479 | defer func() { |
| 480 | defer device.log.Verbosef("%v - Routine: sequential sender - stopped", peer) |
| 481 | peer.stopping.Done() |
| 482 | }() |
| 483 | device.log.Verbosef("%v - Routine: sequential sender - started", peer) |
| 484 | |
| 485 | bufs := make([][]byte, 0, maxBatchSize) |
| 486 | |
| 487 | for elemsContainer := range peer.queue.outbound.c { |
| 488 | bufs = bufs[:0] |
| 489 | if elemsContainer == nil { |
| 490 | return |
| 491 | } |
| 492 | if !peer.isRunning.Load() { |
| 493 | // peer has been stopped; return re-usable elems to the shared pool. |
| 494 | // This is an optimization only. It is possible for the peer to be stopped |
| 495 | // immediately after this check, in which case, elem will get processed. |
| 496 | // The timers and SendBuffers code are resilient to a few stragglers. |
| 497 | // TODO: rework peer shutdown order to ensure |
| 498 | // that we never accidentally keep timers alive longer than necessary. |
| 499 | elemsContainer.Lock() |
| 500 | for _, elem := range elemsContainer.elems { |
| 501 | device.PutMessageBuffer(elem.buffer) |
| 502 | device.PutOutboundElement(elem) |
| 503 | } |
| 504 | device.PutOutboundElementsContainer(elemsContainer) |
| 505 | continue |
| 506 | } |
| 507 | dataSent := false |
| 508 | elemsContainer.Lock() |
| 509 | for _, elem := range elemsContainer.elems { |
| 510 | if len(elem.packet) != MessageKeepaliveSize { |
| 511 | dataSent = true |
| 512 | } |
| 513 | bufs = append(bufs, elem.packet) |
| 514 | } |
| 515 | |
| 516 | peer.timersAnyAuthenticatedPacketTraversal() |
| 517 | peer.timersAnyAuthenticatedPacketSent() |
| 518 | |
| 519 | err := peer.SendBuffers(bufs) |
| 520 | if dataSent { |
| 521 | peer.timersDataSent() |
| 522 | } |
| 523 | for _, elem := range elemsContainer.elems { |
| 524 | device.PutMessageBuffer(elem.buffer) |
| 525 | device.PutOutboundElement(elem) |
| 526 | } |
| 527 | device.PutOutboundElementsContainer(elemsContainer) |
| 528 | if err != nil { |
| 529 | var errGSO conn.ErrUDPGSODisabled |
| 530 | if errors.As(err, &errGSO) { |
| 531 | device.log.Verbosef(err.Error()) |
| 532 | err = errGSO.RetryErr |
| 533 | } |
| 534 | } |
no test coverage detected