run is the main loop for the worker which receives offsets to preload
()
| 384 | |
| 385 | // run is the main loop for the worker which receives offsets to preload |
| 386 | func (w *worker) run() { |
| 387 | var err error |
| 388 | var data []byte |
| 389 | defer func() { |
| 390 | if w.rc != nil { |
| 391 | _ = w.rc.Close() |
| 392 | } |
| 393 | w.r.workersWg.Done() |
| 394 | }() |
| 395 | |
| 396 | for { |
| 397 | chunkStart, open := <-w.r.preloadQueue |
| 398 | if chunkStart < 0 || !open { |
| 399 | break |
| 400 | } |
| 401 | |
| 402 | // skip if it exists |
| 403 | if w.r.UseMemory { |
| 404 | if w.r.memory.HasChunk(w.r.cachedObject, chunkStart) { |
| 405 | continue |
| 406 | } |
| 407 | |
| 408 | // add it in ram if it's in the persistent storage |
| 409 | data, err = w.r.storage().GetChunk(w.r.cachedObject, chunkStart) |
| 410 | if err == nil { |
| 411 | err = w.r.memory.AddChunk(w.r.cachedObject.abs(), data, chunkStart) |
| 412 | if err != nil { |
| 413 | fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err) |
| 414 | } else { |
| 415 | continue |
| 416 | } |
| 417 | } |
| 418 | } else if w.r.storage().HasChunk(w.r.cachedObject, chunkStart) { |
| 419 | continue |
| 420 | } |
| 421 | |
| 422 | chunkEnd := chunkStart + int64(w.r.cacheFs().opt.ChunkSize) |
| 423 | // TODO: Remove this comment if it proves to be reliable for #1896 |
| 424 | //if chunkEnd > w.r.cachedObject.Size() { |
| 425 | // chunkEnd = w.r.cachedObject.Size() |
| 426 | //} |
| 427 | |
| 428 | w.download(chunkStart, chunkEnd, 0) |
| 429 | } |
| 430 | } |
| 431 | |
| 432 | func (w *worker) download(chunkStart, chunkEnd int64, retry int) { |
| 433 | var err error |