cacheReader will split the stream of a reader to be cached at the same time it is read by the original source
(u io.Reader, src fs.ObjectInfo, originalRead func(inn io.Reader))
| 1373 | |
| 1374 | // cacheReader will split the stream of a reader to be cached at the same time it is read by the original source |
| 1375 | func (f *Fs) cacheReader(u io.Reader, src fs.ObjectInfo, originalRead func(inn io.Reader)) { |
| 1376 | // create the pipe and tee reader |
| 1377 | pr, pw := io.Pipe() |
| 1378 | tr := io.TeeReader(u, pw) |
| 1379 | |
| 1380 | // create channel to synchronize |
| 1381 | done := make(chan bool) |
| 1382 | defer close(done) |
| 1383 | |
| 1384 | go func() { |
| 1385 | // notify the cache reader that we're complete after the source FS finishes |
| 1386 | defer func() { |
| 1387 | _ = pw.Close() |
| 1388 | }() |
| 1389 | // process original reading |
| 1390 | originalRead(tr) |
| 1391 | // signal complete |
| 1392 | done <- true |
| 1393 | }() |
| 1394 | |
| 1395 | go func() { |
| 1396 | var offset int64 |
| 1397 | for { |
| 1398 | chunk := make([]byte, f.opt.ChunkSize) |
| 1399 | readSize, err := io.ReadFull(pr, chunk) |
| 1400 | // we ignore 3 failures which are ok: |
| 1401 | // 1. EOF - original reading finished and we got a full buffer too |
| 1402 | // 2. ErrUnexpectedEOF - original reading finished and partial buffer |
| 1403 | // 3. ErrClosedPipe - source remote reader was closed (usually means it reached the end) and we need to stop too |
| 1404 | // if we have a different error: we're going to error out the original reading too and stop this |
| 1405 | if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF && err != io.ErrClosedPipe { |
| 1406 | fs.Errorf(src, "error saving new data in cache. offset: %v, err: %v", offset, err) |
| 1407 | _ = pr.CloseWithError(err) |
| 1408 | break |
| 1409 | } |
| 1410 | // if we have some bytes we cache them |
| 1411 | if readSize > 0 { |
| 1412 | chunk = chunk[:readSize] |
| 1413 | err2 := f.cache.AddChunk(cleanPath(path.Join(f.root, src.Remote())), chunk, offset) |
| 1414 | if err2 != nil { |
| 1415 | fs.Errorf(src, "error saving new data in cache '%v'", err2) |
| 1416 | _ = pr.CloseWithError(err2) |
| 1417 | break |
| 1418 | } |
| 1419 | offset += int64(readSize) |
| 1420 | } |
| 1421 | // stuff should be closed but let's be sure |
| 1422 | if err == io.EOF || err == io.ErrUnexpectedEOF || err == io.ErrClosedPipe { |
| 1423 | _ = pr.Close() |
| 1424 | break |
| 1425 | } |
| 1426 | } |
| 1427 | |
| 1428 | // signal complete |
| 1429 | done <- true |
| 1430 | }() |
| 1431 | |
| 1432 | // wait until both are done |