(p []byte)
| 423 | } |
| 424 | |
| 425 | func (pw *pushWriter) Write(p []byte) (n int, err error) { |
| 426 | status, err := pw.tracker.GetStatus(pw.ref) |
| 427 | if err != nil { |
| 428 | return n, err |
| 429 | } |
| 430 | |
| 431 | if pw.pipe == nil { |
| 432 | select { |
| 433 | case <-pw.done: |
| 434 | return 0, io.ErrClosedPipe |
| 435 | case p := <-pw.pipeC: |
| 436 | pw.replacePipe(p) |
| 437 | } |
| 438 | } else { |
| 439 | select { |
| 440 | case <-pw.done: |
| 441 | return 0, io.ErrClosedPipe |
| 442 | case p := <-pw.pipeC: |
| 443 | return 0, pw.replacePipe(p) |
| 444 | default: |
| 445 | } |
| 446 | } |
| 447 | |
| 448 | n, err = pw.pipe.Write(p) |
| 449 | if errors.Is(err, io.ErrClosedPipe) { |
| 450 | // if the pipe is closed, we might have the original error on the error |
| 451 | // channel - so we should try and get it |
| 452 | select { |
| 453 | case <-pw.done: |
| 454 | case err = <-pw.errC: |
| 455 | pw.Close() |
| 456 | case p := <-pw.pipeC: |
| 457 | return 0, pw.replacePipe(p) |
| 458 | case resp := <-pw.respC: |
| 459 | pw.setResponse(resp) |
| 460 | } |
| 461 | } |
| 462 | status.Offset += int64(n) |
| 463 | status.UpdatedAt = time.Now() |
| 464 | pw.tracker.SetStatus(pw.ref, status) |
| 465 | return |
| 466 | } |
| 467 | |
| 468 | func (pw *pushWriter) Close() error { |
| 469 | // Ensure pipeC is closed but handle `Close()` being |
nothing calls this directly
no test coverage detected