DecodeConcurrent will decode the full stream to w. This function should not be combined with reading, seeking or other operations. Up to 'concurrent' goroutines will be used. If <= 0, runtime.NumCPU will be used. On success the number of bytes decompressed nil and is returned. This is mainly intende
(w io.Writer, concurrent int)
| 406 | // On success the number of bytes decompressed nil and is returned. |
| 407 | // This is mainly intended for bigger streams. |
| 408 | func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) { |
| 409 | if r.i > 0 || r.j > 0 || r.blockStart > 0 { |
| 410 | return 0, errors.New("DecodeConcurrent called after ") |
| 411 | } |
| 412 | if concurrent <= 0 { |
| 413 | concurrent = runtime.NumCPU() |
| 414 | } |
| 415 | |
| 416 | // Write to output |
| 417 | var errMu sync.Mutex |
| 418 | var aErr error |
| 419 | setErr := func(e error) (ok bool) { |
| 420 | errMu.Lock() |
| 421 | defer errMu.Unlock() |
| 422 | if e == nil { |
| 423 | return aErr == nil |
| 424 | } |
| 425 | if aErr == nil { |
| 426 | aErr = e |
| 427 | } |
| 428 | return false |
| 429 | } |
| 430 | hasErr := func() (ok bool) { |
| 431 | errMu.Lock() |
| 432 | v := aErr != nil |
| 433 | errMu.Unlock() |
| 434 | return v |
| 435 | } |
| 436 | |
| 437 | var aWritten int64 |
| 438 | toRead := make(chan []byte, concurrent) |
| 439 | writtenBlocks := make(chan []byte, concurrent) |
| 440 | queue := make(chan chan []byte, concurrent) |
| 441 | reUse := make(chan chan []byte, concurrent) |
| 442 | for i := 0; i < concurrent; i++ { |
| 443 | toRead <- make([]byte, 0, r.maxBufSize) |
| 444 | writtenBlocks <- make([]byte, 0, r.maxBufSize) |
| 445 | reUse <- make(chan []byte, 1) |
| 446 | } |
| 447 | // Writer |
| 448 | var wg sync.WaitGroup |
| 449 | wg.Add(1) |
| 450 | go func() { |
| 451 | defer wg.Done() |
| 452 | for toWrite := range queue { |
| 453 | entry := <-toWrite |
| 454 | reUse <- toWrite |
| 455 | if hasErr() || entry == nil { |
| 456 | if entry != nil { |
| 457 | writtenBlocks <- entry |
| 458 | } |
| 459 | continue |
| 460 | } |
| 461 | if hasErr() { |
| 462 | writtenBlocks <- entry |
| 463 | continue |
| 464 | } |
| 465 | n, err := w.Write(entry) |