MCPcopy
hub / github.com/rclone/rclone / updatePendingUpload

Method updatePendingUpload

backend/cache/storage_persistent.go:933–977  ·  view source on GitHub ↗

updatePendingUpload allows to update an existing item in the queue while checking if it's not started in the same transaction. If it is started, it will not allow the update

(remote string, fn func(item *tempUploadInfo) error)

Source from the content-addressed store, hash-verified

931// updatePendingUpload allows to update an existing item in the queue while checking if it's not started in the same
932// transaction. If it is started, it will not allow the update
933func (b *Persistent) updatePendingUpload(remote string, fn func(item *tempUploadInfo) error) error {
934 b.tempQueueMux.Lock()
935 defer b.tempQueueMux.Unlock()
936
937 return b.db.Update(func(tx *bolt.Tx) error {
938 bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
939 if err != nil {
940 return fmt.Errorf("couldn't bucket for %v", tempBucket)
941 }
942
943 var tempObj = &tempUploadInfo{}
944 v := bucket.Get([]byte(remote))
945 err = json.Unmarshal(v, tempObj)
946 if err != nil {
947 return fmt.Errorf("pending upload (%v) not found %v", remote, err)
948 }
949 if tempObj.Started {
950 return fmt.Errorf("pending upload already started %v", remote)
951 }
952 err = fn(tempObj)
953 if err != nil {
954 return err
955 }
956 if remote != tempObj.DestPath {
957 err := bucket.Delete([]byte(remote))
958 if err != nil {
959 return err
960 }
961 // if this is removed then the entry can be removed too
962 if tempObj.DestPath == "" {
963 return nil
964 }
965 }
966 v2, err := json.Marshal(tempObj)
967 if err != nil {
968 return fmt.Errorf("pending upload not updated: %w", err)
969 }
970 err = bucket.Put([]byte(tempObj.DestPath), v2)
971 if err != nil {
972 return fmt.Errorf("pending upload not updated: %w", err)
973 }
974
975 return nil
976 })
977}
978
979// ReconcileTempUploads will recursively look for all the files in the temp directory and add them to the queue
980func (b *Persistent) ReconcileTempUploads(ctx context.Context, cacheFs *Fs) error {

Callers 2

MoveMethod · 0.80

Calls 7

LockMethod · 0.65
UnlockMethod · 0.65
UpdateMethod · 0.65
GetMethod · 0.65
DeleteMethod · 0.65
PutMethod · 0.65
ErrorfMethod · 0.45

Tested by 1