MCPcopy
hub / github.com/klauspost/compress / DecodeConcurrent

Method DecodeConcurrent

s2/reader.go:408–662  ·  view source on GitHub ↗

DecodeConcurrent will decode the full stream to w. This function should not be combined with reading, seeking or other operations. Up to 'concurrent' goroutines will be used. If <= 0, runtime.NumCPU will be used. On success the number of bytes decompressed nil and is returned. This is mainly intende

(w io.Writer, concurrent int)

Source from the content-addressed store, hash-verified

406// On success the number of bytes decompressed nil and is returned.
407// This is mainly intended for bigger streams.
408func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) {
409 if r.i > 0 || r.j > 0 || r.blockStart > 0 {
410 return 0, errors.New("DecodeConcurrent called after ")
411 }
412 if concurrent <= 0 {
413 concurrent = runtime.NumCPU()
414 }
415
416 // Write to output
417 var errMu sync.Mutex
418 var aErr error
419 setErr := func(e error) (ok bool) {
420 errMu.Lock()
421 defer errMu.Unlock()
422 if e == nil {
423 return aErr == nil
424 }
425 if aErr == nil {
426 aErr = e
427 }
428 return false
429 }
430 hasErr := func() (ok bool) {
431 errMu.Lock()
432 v := aErr != nil
433 errMu.Unlock()
434 return v
435 }
436
437 var aWritten int64
438 toRead := make(chan []byte, concurrent)
439 writtenBlocks := make(chan []byte, concurrent)
440 queue := make(chan chan []byte, concurrent)
441 reUse := make(chan chan []byte, concurrent)
442 for i := 0; i < concurrent; i++ {
443 toRead <- make([]byte, 0, r.maxBufSize)
444 writtenBlocks <- make([]byte, 0, r.maxBufSize)
445 reUse <- make(chan []byte, 1)
446 }
447 // Writer
448 var wg sync.WaitGroup
449 wg.Add(1)
450 go func() {
451 defer wg.Done()
452 for toWrite := range queue {
453 entry := <-toWrite
454 reUse <- toWrite
455 if hasErr() || entry == nil {
456 if entry != nil {
457 writtenBlocks <- entry
458 }
459 continue
460 }
461 if hasErr() {
462 writtenBlocks <- entry
463 continue
464 }
465 n, err := w.Write(entry)

Callers 6

TestDecoderMaxBlockSizeFunction · 0.95
FuzzStreamDecodeFunction · 0.95
mainFunction · 0.95
mainFunction · 0.95
mainFunction · 0.95
verifyToFunction · 0.95

Calls 6

readFullMethod · 0.95
skippableMethod · 0.95
DecodedLenFunction · 0.70
DecodeFunction · 0.70
crcFunction · 0.70
WriteMethod · 0.65

Tested by 2

TestDecoderMaxBlockSizeFunction · 0.76
FuzzStreamDecodeFunction · 0.76