MCPcopy
hub / github.com/EverythingSuckz/TG-FileStreamBot / prefetch

Method prefetch

internal/stream/pipe.go:142–242  ·  view source on GitHub ↗

prefetch runs in a goroutine, fetching blocks concurrently and sending to blockQueue.

()

Source from the content-addressed store, hash-verified

140
141// prefetch runs in a goroutine, fetching blocks concurrently and sending to blockQueue.
142func (p *StreamPipe) prefetch() {
143 defer close(p.blockQueue)
144
145 // calc block boundaries
146 alignedStart := p.start - (p.start % p.blockSize)
147 leftTrim := p.start - alignedStart
148 rightTrim := (p.end % p.blockSize) + 1
149 totalBlocks := int((p.end - alignedStart + p.blockSize) / p.blockSize)
150
151 currentBlock := 0
152 offset := alignedStart
153
154 for currentBlock < totalBlocks {
155 // check for cancellation
156 select {
157 case <-p.ctx.Done():
158 return
159 default:
160 }
161
162 // fetch a batch of blocks concurrently
163 batchSize := min(config.ValueOf.StreamConcurrency, totalBlocks-currentBlock)
164 blocks := make([][]byte, batchSize)
165
166 var wg sync.WaitGroup
167 var fetchErr error
168 var errMu sync.Mutex
169
170 for i := range batchSize {
171 wg.Add(1)
172 go func(idx int) {
173 defer wg.Done()
174
175 blockNum := currentBlock + idx
176 blockOffset := offset + int64(idx)*p.blockSize
177
178 data, err := p.downloadBlockWithRetry(blockOffset)
179 dataLen := int64(len(data))
180
181 if err != nil {
182 errMu.Lock()
183 if fetchErr == nil {
184 fetchErr = err
185 }
186 errMu.Unlock()
187 return
188 }
189
190 // trim first/last block to exact range
191 if totalBlocks == 1 {
192 if dataLen < rightTrim {
193 rightTrim = dataLen
194 }
195 if leftTrim > dataLen {
196 leftTrim = dataLen
197 }
198 data = data[leftTrim:rightTrim]
199 } else if blockNum == 0 {

Callers 1

NewStreamPipeFunction · 0.95

Calls 2

AddMethod · 0.80

Tested by

no test coverage detected