updatePendingUpload allows to update an existing item in the queue while checking if it's not started in the same transaction. If it is started, it will not allow the update
(remote string, fn func(item *tempUploadInfo) error)
| 931 | // updatePendingUpload allows to update an existing item in the queue while checking if it's not started in the same |
| 932 | // transaction. If it is started, it will not allow the update |
| 933 | func (b *Persistent) updatePendingUpload(remote string, fn func(item *tempUploadInfo) error) error { |
| 934 | b.tempQueueMux.Lock() |
| 935 | defer b.tempQueueMux.Unlock() |
| 936 | |
| 937 | return b.db.Update(func(tx *bolt.Tx) error { |
| 938 | bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket)) |
| 939 | if err != nil { |
| 940 | return fmt.Errorf("couldn't bucket for %v", tempBucket) |
| 941 | } |
| 942 | |
| 943 | var tempObj = &tempUploadInfo{} |
| 944 | v := bucket.Get([]byte(remote)) |
| 945 | err = json.Unmarshal(v, tempObj) |
| 946 | if err != nil { |
| 947 | return fmt.Errorf("pending upload (%v) not found %v", remote, err) |
| 948 | } |
| 949 | if tempObj.Started { |
| 950 | return fmt.Errorf("pending upload already started %v", remote) |
| 951 | } |
| 952 | err = fn(tempObj) |
| 953 | if err != nil { |
| 954 | return err |
| 955 | } |
| 956 | if remote != tempObj.DestPath { |
| 957 | err := bucket.Delete([]byte(remote)) |
| 958 | if err != nil { |
| 959 | return err |
| 960 | } |
| 961 | // if this is removed then the entry can be removed too |
| 962 | if tempObj.DestPath == "" { |
| 963 | return nil |
| 964 | } |
| 965 | } |
| 966 | v2, err := json.Marshal(tempObj) |
| 967 | if err != nil { |
| 968 | return fmt.Errorf("pending upload not updated: %w", err) |
| 969 | } |
| 970 | err = bucket.Put([]byte(tempObj.DestPath), v2) |
| 971 | if err != nil { |
| 972 | return fmt.Errorf("pending upload not updated: %w", err) |
| 973 | } |
| 974 | |
| 975 | return nil |
| 976 | }) |
| 977 | } |
| 978 | |
| 979 | // ReconcileTempUploads will recursively look for all the files in the temp directory and add them to the queue |
| 980 | func (b *Persistent) ReconcileTempUploads(ctx context.Context, cacheFs *Fs) error { |