put in to the remote path
(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn)
| 1439 | |
| 1440 | // put in to the remote path |
| 1441 | func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (fs.Object, error) { |
| 1442 | var err error |
| 1443 | var obj fs.Object |
| 1444 | |
| 1445 | // queue for upload and store in temp fs if configured |
| 1446 | if f.opt.TempWritePath != "" { |
| 1447 | // we need to clear the caches before a put through temp fs |
| 1448 | parentCd := NewDirectory(f, cleanPath(path.Dir(src.Remote()))) |
| 1449 | _ = f.cache.ExpireDir(parentCd) |
| 1450 | f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) |
| 1451 | |
| 1452 | obj, err = f.tempFs.Put(ctx, in, src, options...) |
| 1453 | if err != nil { |
| 1454 | fs.Errorf(obj, "put: failed to upload in temp fs: %v", err) |
| 1455 | return nil, err |
| 1456 | } |
| 1457 | fs.Infof(obj, "put: uploaded in temp fs") |
| 1458 | err = f.cache.addPendingUpload(path.Join(f.Root(), src.Remote()), false) |
| 1459 | if err != nil { |
| 1460 | fs.Errorf(obj, "put: failed to queue for upload: %v", err) |
| 1461 | return nil, err |
| 1462 | } |
| 1463 | fs.Infof(obj, "put: queued for upload") |
| 1464 | // if cache writes is enabled write it first through cache |
| 1465 | } else if f.opt.StoreWrites { |
| 1466 | f.cacheReader(in, src, func(inn io.Reader) { |
| 1467 | obj, err = put(ctx, inn, src, options...) |
| 1468 | }) |
| 1469 | if err == nil { |
| 1470 | fs.Debugf(obj, "put: uploaded to remote fs and saved in cache") |
| 1471 | } |
| 1472 | // last option: save it directly in remote fs |
| 1473 | } else { |
| 1474 | obj, err = put(ctx, in, src, options...) |
| 1475 | if err == nil { |
| 1476 | fs.Debugf(obj, "put: uploaded to remote fs") |
| 1477 | } |
| 1478 | } |
| 1479 | // validate and stop if errors are found |
| 1480 | if err != nil { |
| 1481 | fs.Errorf(src, "put: error uploading: %v", err) |
| 1482 | return nil, err |
| 1483 | } |
| 1484 | |
| 1485 | // cache the new file |
| 1486 | cachedObj := ObjectFromOriginal(ctx, f, obj) |
| 1487 | |
| 1488 | // deleting cached chunks and info to be replaced with new ones |
| 1489 | _ = f.cache.RemoveObject(cachedObj.abs()) |
| 1490 | |
| 1491 | cachedObj.persist() |
| 1492 | fs.Debugf(cachedObj, "put: added to cache") |
| 1493 | |
| 1494 | // expire parent |
| 1495 | parentCd := NewDirectory(f, cleanPath(path.Dir(cachedObj.Remote()))) |
| 1496 | err = f.cache.ExpireDir(parentCd) |
| 1497 | if err != nil { |
| 1498 | fs.Errorf(cachedObj, "put: cache expire error: %v", err) |
no test coverage detected