(hdr *bep.Header, fourByteBuf []byte)
| 513 | } |
| 514 | |
| 515 | func (c *rawConnection) readMessageAfterHeader(hdr *bep.Header, fourByteBuf []byte) (proto.Message, error) { |
| 516 | // First comes a 4 byte message length |
| 517 | |
| 518 | if _, err := io.ReadFull(c.cr, fourByteBuf[:4]); err != nil { |
| 519 | return nil, fmt.Errorf("reading message length: %w", err) |
| 520 | } |
| 521 | msgLen := int32(binary.BigEndian.Uint32(fourByteBuf)) |
| 522 | if msgLen < 0 { |
| 523 | return nil, fmt.Errorf("negative message length %d", msgLen) |
| 524 | } else if msgLen > MaxMessageLen { |
| 525 | return nil, fmt.Errorf("message length %d exceeds maximum %d", msgLen, MaxMessageLen) |
| 526 | } |
| 527 | |
| 528 | // Then comes the message |
| 529 | |
| 530 | buf := BufferPool.Get(int(msgLen)) |
| 531 | defer BufferPool.Put(buf) |
| 532 | |
| 533 | if _, err := io.ReadFull(c.cr, buf); err != nil { |
| 534 | return nil, fmt.Errorf("reading message: %w", err) |
| 535 | } |
| 536 | |
| 537 | // ... which might be compressed |
| 538 | |
| 539 | switch hdr.Compression { |
| 540 | case bep.MessageCompression_MESSAGE_COMPRESSION_NONE: |
| 541 | // Nothing |
| 542 | |
| 543 | case bep.MessageCompression_MESSAGE_COMPRESSION_LZ4: |
| 544 | decomp, err := lz4Decompress(buf) |
| 545 | if err != nil { |
| 546 | return nil, fmt.Errorf("decompressing message: %w", err) |
| 547 | } |
| 548 | buf = decomp |
| 549 | |
| 550 | default: |
| 551 | return nil, fmt.Errorf("unknown message compression %d", hdr.Compression) |
| 552 | } |
| 553 | |
| 554 | // ... and is then unmarshalled |
| 555 | |
| 556 | metricDeviceRecvDecompressedBytes.WithLabelValues(c.idString).Add(float64(4 + len(buf))) |
| 557 | |
| 558 | msg, err := newMessage(hdr.Type) |
| 559 | if err != nil { |
| 560 | return nil, err |
| 561 | } |
| 562 | if err := proto.Unmarshal(buf, msg); err != nil { |
| 563 | return nil, fmt.Errorf("unmarshalling message: %w", err) |
| 564 | } |
| 565 | |
| 566 | return msg, nil |
| 567 | } |
| 568 | |
| 569 | func (c *rawConnection) readHeader(fourByteBuf []byte) (*bep.Header, error) { |
| 570 | // First comes a 2 byte header length |
no test coverage detected