Write msg out compressed, given its uncompressed marshaled payload. The first return value indicates whether compression succeeded. If not, the caller should retry without compression.
(msg proto.Message, marshaled []byte)
| 827 | // The first return value indicates whether compression succeeded. |
| 828 | // If not, the caller should retry without compression. |
| 829 | func (c *rawConnection) writeCompressedMessage(msg proto.Message, marshaled []byte) (ok bool, err error) { |
| 830 | hdr := &bep.Header{ |
| 831 | Type: typeOf(msg), |
| 832 | Compression: bep.MessageCompression_MESSAGE_COMPRESSION_LZ4, |
| 833 | } |
| 834 | hdrSize := proto.Size(hdr) |
| 835 | if hdrSize > 1<<16-1 { |
| 836 | panic("impossibly large header") |
| 837 | } |
| 838 | |
| 839 | cOverhead := 2 + hdrSize + 4 |
| 840 | |
| 841 | metricDeviceSentUncompressedBytes.WithLabelValues(c.idString).Add(float64(cOverhead + len(marshaled))) |
| 842 | |
| 843 | // The compressed size may be at most n-n/32 = .96875*n bytes, |
| 844 | // I.e., if we can't save at least 3.125% bandwidth, we forgo compression. |
| 845 | // This number is arbitrary but cheap to compute. |
| 846 | maxCompressed := cOverhead + len(marshaled) - len(marshaled)/32 |
| 847 | buf := BufferPool.Get(maxCompressed) |
| 848 | defer BufferPool.Put(buf) |
| 849 | |
| 850 | compressedSize, err := lz4Compress(marshaled, buf[cOverhead:]) |
| 851 | totSize := compressedSize + cOverhead |
| 852 | if err != nil { |
| 853 | return false, nil |
| 854 | } |
| 855 | |
| 856 | // Header length |
| 857 | binary.BigEndian.PutUint16(buf, uint16(hdrSize)) |
| 858 | // Header |
| 859 | if _, err := protoutil.MarshalTo(buf[2:], hdr); err != nil { |
| 860 | return true, fmt.Errorf("marshalling header: %w", err) |
| 861 | } |
| 862 | // Message length |
| 863 | binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(compressedSize)) |
| 864 | |
| 865 | n, err := c.cw.Write(buf[:totSize]) |
| 866 | l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message (%d uncompressed)), err=%v", n, hdrSize, compressedSize, len(marshaled), err) |
| 867 | if err != nil { |
| 868 | return true, fmt.Errorf("writing message: %w", err) |
| 869 | } |
| 870 | return true, nil |
| 871 | } |
| 872 | |
| 873 | func typeOf(msg proto.Message) bep.MessageType { |
| 874 | switch msg.(type) { |
no test coverage detected