Create Decoder: ASYNC: Spawn 3 go routines. 0: Read frames and decode block literals. 1: Decode sequences. 2: Execute sequences, send to output.
(ctx context.Context, r io.Reader, output chan decodeOutput)
| 653 | // 1: Decode sequences. |
| 654 | // 2: Execute sequences, send to output. |
| 655 | func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) { |
| 656 | defer d.streamWg.Done() |
| 657 | br := readerWrapper{r: r} |
| 658 | |
| 659 | var seqDecode = make(chan *blockDec, d.o.concurrent) |
| 660 | var seqExecute = make(chan *blockDec, d.o.concurrent) |
| 661 | |
| 662 | // Async 1: Decode sequences... |
| 663 | go func() { |
| 664 | var hist history |
| 665 | var hasErr bool |
| 666 | |
| 667 | for block := range seqDecode { |
| 668 | if hasErr { |
| 669 | if block != nil { |
| 670 | seqExecute <- block |
| 671 | } |
| 672 | continue |
| 673 | } |
| 674 | if block.async.newHist != nil { |
| 675 | if debugDecoder { |
| 676 | println("Async 1: new history, recent:", block.async.newHist.recentOffsets) |
| 677 | } |
| 678 | hist.reset() |
| 679 | hist.decoders = block.async.newHist.decoders |
| 680 | hist.recentOffsets = block.async.newHist.recentOffsets |
| 681 | hist.windowSize = block.async.newHist.windowSize |
| 682 | if block.async.newHist.dict != nil { |
| 683 | hist.setDict(block.async.newHist.dict) |
| 684 | } |
| 685 | } |
| 686 | if block.err != nil || block.Type != blockTypeCompressed { |
| 687 | hasErr = block.err != nil |
| 688 | seqExecute <- block |
| 689 | continue |
| 690 | } |
| 691 | |
| 692 | hist.decoders.literals = block.async.literals |
| 693 | block.err = block.prepareSequences(block.async.seqData, &hist) |
| 694 | if debugDecoder && block.err != nil { |
| 695 | println("prepareSequences returned:", block.err) |
| 696 | } |
| 697 | hasErr = block.err != nil |
| 698 | if block.err == nil { |
| 699 | block.err = block.decodeSequences(&hist) |
| 700 | if debugDecoder && block.err != nil { |
| 701 | println("decodeSequences returned:", block.err) |
| 702 | } |
| 703 | hasErr = block.err != nil |
| 704 | // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs] |
| 705 | block.async.seqSize = hist.decoders.seqSize |
| 706 | } |
| 707 | seqExecute <- block |
| 708 | } |
| 709 | close(seqExecute) |
| 710 | hist.reset() |
| 711 | }() |
| 712 |
no test coverage detected