MCPcopy
hub / github.com/klauspost/compress / startStreamDecoder

Method startStreamDecoder

zstd/decoder.go:655–940  ·  view source on GitHub ↗

Create Decoder: ASYNC: Spawn 3 go routines. 0: Read frames and decode block literals. 1: Decode sequences. 2: Execute sequences, send to output.

(ctx context.Context, r io.Reader, output chan decodeOutput)

Source from the content-addressed store, hash-verified

653// 1: Decode sequences.
654// 2: Execute sequences, send to output.
655func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
656 defer d.streamWg.Done()
657 br := readerWrapper{r: r}
658
659 var seqDecode = make(chan *blockDec, d.o.concurrent)
660 var seqExecute = make(chan *blockDec, d.o.concurrent)
661
662 // Async 1: Decode sequences...
663 go func() {
664 var hist history
665 var hasErr bool
666
667 for block := range seqDecode {
668 if hasErr {
669 if block != nil {
670 seqExecute <- block
671 }
672 continue
673 }
674 if block.async.newHist != nil {
675 if debugDecoder {
676 println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
677 }
678 hist.reset()
679 hist.decoders = block.async.newHist.decoders
680 hist.recentOffsets = block.async.newHist.recentOffsets
681 hist.windowSize = block.async.newHist.windowSize
682 if block.async.newHist.dict != nil {
683 hist.setDict(block.async.newHist.dict)
684 }
685 }
686 if block.err != nil || block.Type != blockTypeCompressed {
687 hasErr = block.err != nil
688 seqExecute <- block
689 continue
690 }
691
692 hist.decoders.literals = block.async.literals
693 block.err = block.prepareSequences(block.async.seqData, &hist)
694 if debugDecoder && block.err != nil {
695 println("prepareSequences returned:", block.err)
696 }
697 hasErr = block.err != nil
698 if block.err == nil {
699 block.err = block.decodeSequences(&hist)
700 if debugDecoder && block.err != nil {
701 println("decodeSequences returned:", block.err)
702 }
703 hasErr = block.err != nil
704 // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs]
705 block.async.seqSize = hist.decoders.seqSize
706 }
707 seqExecute <- block
708 }
709 close(seqExecute)
710 hist.reset()
711 }()
712

Callers 1

ResetMethod · 0.95

Calls 14

resetMethod · 0.95
setDictMethod · 0.95
appendMethod · 0.95
setDictMethod · 0.95
sendErrMethod · 0.95
printlnFunction · 0.85
printfFunction · 0.85
prepareSequencesMethod · 0.80
decodeSequencesMethod · 0.80
executeSequencesMethod · 0.80
decodeLiteralsMethod · 0.80
readSmallMethod · 0.65

Tested by

no test coverage detected