MCPcopy
hub / github.com/rclone/rclone / cacheReader

Method cacheReader

backend/cache/cache.go:1375–1436  ·  view source on GitHub ↗

cacheReader will split the stream of a reader to be cached at the same time it is read by the original source

(u io.Reader, src fs.ObjectInfo, originalRead func(inn io.Reader))

Source from the content-addressed store, hash-verified

1373
1374// cacheReader will split the stream of a reader to be cached at the same time it is read by the original source
1375func (f *Fs) cacheReader(u io.Reader, src fs.ObjectInfo, originalRead func(inn io.Reader)) {
1376 // create the pipe and tee reader
1377 pr, pw := io.Pipe()
1378 tr := io.TeeReader(u, pw)
1379
1380 // create channel to synchronize
1381 done := make(chan bool)
1382 defer close(done)
1383
1384 go func() {
1385 // notify the cache reader that we're complete after the source FS finishes
1386 defer func() {
1387 _ = pw.Close()
1388 }()
1389 // process original reading
1390 originalRead(tr)
1391 // signal complete
1392 done <- true
1393 }()
1394
1395 go func() {
1396 var offset int64
1397 for {
1398 chunk := make([]byte, f.opt.ChunkSize)
1399 readSize, err := io.ReadFull(pr, chunk)
1400 // we ignore 3 failures which are ok:
1401 // 1. EOF - original reading finished and we got a full buffer too
1402 // 2. ErrUnexpectedEOF - original reading finished and partial buffer
1403 // 3. ErrClosedPipe - source remote reader was closed (usually means it reached the end) and we need to stop too
1404 // if we have a different error: we're going to error out the original reading too and stop this
1405 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF && err != io.ErrClosedPipe {
1406 fs.Errorf(src, "error saving new data in cache. offset: %v, err: %v", offset, err)
1407 _ = pr.CloseWithError(err)
1408 break
1409 }
1410 // if we have some bytes we cache them
1411 if readSize > 0 {
1412 chunk = chunk[:readSize]
1413 err2 := f.cache.AddChunk(cleanPath(path.Join(f.root, src.Remote())), chunk, offset)
1414 if err2 != nil {
1415 fs.Errorf(src, "error saving new data in cache '%v'", err2)
1416 _ = pr.CloseWithError(err2)
1417 break
1418 }
1419 offset += int64(readSize)
1420 }
1421 // stuff should be closed but let's be sure
1422 if err == io.EOF || err == io.ErrUnexpectedEOF || err == io.ErrClosedPipe {
1423 _ = pr.Close()
1424 break
1425 }
1426 }
1427
1428 // signal complete
1429 done <- true
1430 }()
1431
1432 // wait until both are done

Callers 1

putMethod · 0.95

Calls 7

ErrorfFunction · 0.92
closeFunction · 0.85
cleanPathFunction · 0.85
JoinMethod · 0.80
CloseMethod · 0.65
RemoteMethod · 0.65
AddChunkMethod · 0.45

Tested by

no test coverage detected