class function which creates a StreamPipe with default configuration.
( ctx context.Context, client *gotgproto.Client, location tg.InputFileLocationClass, start, end int64, log *zap.Logger, )
| 62 | |
| 63 | // class function which creates a StreamPipe with default configuration. |
| 64 | func NewStreamPipe( |
| 65 | ctx context.Context, |
| 66 | client *gotgproto.Client, |
| 67 | location tg.InputFileLocationClass, |
| 68 | start, end int64, |
| 69 | log *zap.Logger, |
| 70 | ) (io.ReadCloser, error) { |
| 71 | |
| 72 | if start > end { |
| 73 | return nil, fmt.Errorf("invalid range: start (%d) > end (%d)", start, end) |
| 74 | } |
| 75 | |
| 76 | ctx, cancel := context.WithCancel(ctx) |
| 77 | |
| 78 | totalBytes := end - start + 1 |
| 79 | blockSize := calculateBlockSize(start, end) |
| 80 | |
| 81 | p := &StreamPipe{ |
| 82 | ctx: ctx, |
| 83 | cancel: cancel, |
| 84 | log: log.Named("StreamPipe"), |
| 85 | client: client, |
| 86 | location: location, |
| 87 | start: start, |
| 88 | end: end, |
| 89 | blockSize: blockSize, |
| 90 | totalBytes: totalBytes, |
| 91 | blockQueue: make(chan []byte, config.ValueOf.StreamBufferCount), |
| 92 | } |
| 93 | |
| 94 | // start prefetching in background |
| 95 | go p.prefetch() |
| 96 | |
| 97 | return p, nil |
| 98 | } |
| 99 | |
| 100 | // Read implements io.Reader |
| 101 | func (p *StreamPipe) Read(buf []byte) (n int, err error) { |
no test coverage detected