| 769 | } |
| 770 | |
| 771 | func (c *rawConnection) writeMessage(msg proto.Message) error { |
| 772 | msgContext, _ := messageContext(msg) |
| 773 | l.Debugf("Writing %v", msgContext) |
| 774 | |
| 775 | defer func() { |
| 776 | metricDeviceSentMessages.WithLabelValues(c.idString).Inc() |
| 777 | }() |
| 778 | |
| 779 | size := proto.Size(msg) |
| 780 | hdr := &bep.Header{ |
| 781 | Type: typeOf(msg), |
| 782 | } |
| 783 | hdrSize := proto.Size(hdr) |
| 784 | if hdrSize > 1<<16-1 { |
| 785 | panic("impossibly large header") |
| 786 | } |
| 787 | |
| 788 | overhead := 2 + hdrSize + 4 |
| 789 | totSize := overhead + size |
| 790 | buf := BufferPool.Get(totSize) |
| 791 | defer BufferPool.Put(buf) |
| 792 | |
| 793 | // Message |
| 794 | if _, err := protoutil.MarshalTo(buf[overhead:], msg); err != nil { |
| 795 | return fmt.Errorf("marshalling message: %w", err) |
| 796 | } |
| 797 | |
| 798 | if c.shouldCompressMessage(msg) { |
| 799 | ok, err := c.writeCompressedMessage(msg, buf[overhead:]) |
| 800 | if ok { |
| 801 | return err |
| 802 | } |
| 803 | } |
| 804 | |
| 805 | metricDeviceSentUncompressedBytes.WithLabelValues(c.idString).Add(float64(totSize)) |
| 806 | |
| 807 | // Header length |
| 808 | binary.BigEndian.PutUint16(buf, uint16(hdrSize)) |
| 809 | // Header |
| 810 | if _, err := protoutil.MarshalTo(buf[2:], hdr); err != nil { |
| 811 | return fmt.Errorf("marshalling header: %w", err) |
| 812 | } |
| 813 | // Message length |
| 814 | binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(size)) |
| 815 | |
| 816 | n, err := c.cw.Write(buf) |
| 817 | |
| 818 | l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message), err=%v", n, hdrSize, size, err) |
| 819 | if err != nil { |
| 820 | return fmt.Errorf("writing message: %w", err) |
| 821 | } |
| 822 | return nil |
| 823 | } |
| 824 | |
| 825 | // Write msg out compressed, given its uncompressed marshaled payload. |
| 826 | // |