(reader io.Reader)
| 21 | } |
| 22 | |
| 23 | func (d *decoder) Decode(reader io.Reader) (<-chan message.Message, <-chan error) { |
| 24 | result := make(chan message.Message) |
| 25 | errs := make(chan error) |
| 26 | |
| 27 | version, err := readHeader(reader, CurrentVersion) |
| 28 | if err != nil { |
| 29 | go func() { |
| 30 | errs <- err |
| 31 | close(result) |
| 32 | close(errs) |
| 33 | }() |
| 34 | return result, errs |
| 35 | } |
| 36 | |
| 37 | gzipReader, err := gzip.NewReader(reader) |
| 38 | if err != nil { |
| 39 | go func() { |
| 40 | errs <- fmt.Errorf("failed to open gzip stream (%w)", err) |
| 41 | close(result) |
| 42 | close(errs) |
| 43 | }() |
| 44 | return result, errs |
| 45 | } |
| 46 | |
| 47 | cborReader := cbor.NewDecoder(gzipReader) |
| 48 | |
| 49 | go func() { |
| 50 | switch version { |
| 51 | case 1: |
| 52 | var messages []decodedMessage |
| 53 | if err = cborReader.Decode(&messages); err != nil { |
| 54 | errs <- fmt.Errorf("failed to decode messages (%w)", err) |
| 55 | close(result) |
| 56 | close(errs) |
| 57 | return |
| 58 | } |
| 59 | for _, v := range messages { |
| 60 | decodedMessage, err := decodeMessage(v) |
| 61 | if err != nil { |
| 62 | errs <- err |
| 63 | } else { |
| 64 | result <- *decodedMessage |
| 65 | } |
| 66 | } |
| 67 | case 2: |
| 68 | for { |
| 69 | var msg decodedMessage |
| 70 | if err = cborReader.Decode(&msg); err != nil { |
| 71 | if !errors.Is(err, io.ErrUnexpectedEOF) { |
| 72 | errs <- fmt.Errorf("failed to decode messages (%w)", err) |
| 73 | } |
| 74 | close(result) |
| 75 | close(errs) |
| 76 | return |
| 77 | } |
| 78 | decodedMessage, err := decodeMessage(msg) |
| 79 | if err != nil { |
| 80 | errs <- err |
nothing calls this directly
no test coverage detected