ReconcileTempUploads will recursively look for all the files in the temp directory and add them to the queue
(ctx context.Context, cacheFs *Fs)
| 978 | |
| 979 | // ReconcileTempUploads will recursively look for all the files in the temp directory and add them to the queue |
| 980 | func (b *Persistent) ReconcileTempUploads(ctx context.Context, cacheFs *Fs) error { |
| 981 | return b.db.Update(func(tx *bolt.Tx) error { |
| 982 | _ = tx.DeleteBucket([]byte(tempBucket)) |
| 983 | bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket)) |
| 984 | if err != nil { |
| 985 | return err |
| 986 | } |
| 987 | |
| 988 | var queuedEntries []fs.Object |
| 989 | err = walk.ListR(ctx, cacheFs.tempFs, "", true, -1, walk.ListObjects, func(entries fs.DirEntries) error { |
| 990 | for _, o := range entries { |
| 991 | if oo, ok := o.(fs.Object); ok { |
| 992 | queuedEntries = append(queuedEntries, oo) |
| 993 | } |
| 994 | } |
| 995 | return nil |
| 996 | }) |
| 997 | if err != nil { |
| 998 | return err |
| 999 | } |
| 1000 | |
| 1001 | fs.Debugf(cacheFs, "reconciling temporary uploads") |
| 1002 | for _, queuedEntry := range queuedEntries { |
| 1003 | destPath := path.Join(cacheFs.Root(), queuedEntry.Remote()) |
| 1004 | tempObj := &tempUploadInfo{ |
| 1005 | DestPath: destPath, |
| 1006 | AddedOn: time.Now(), |
| 1007 | Started: false, |
| 1008 | } |
| 1009 | |
| 1010 | // cache Object Info |
| 1011 | encoded, err := json.Marshal(tempObj) |
| 1012 | if err != nil { |
| 1013 | return fmt.Errorf("couldn't marshal object (%v) info: %v", queuedEntry, err) |
| 1014 | } |
| 1015 | err = bucket.Put([]byte(destPath), encoded) |
| 1016 | if err != nil { |
| 1017 | return fmt.Errorf("couldn't cache object (%v) info: %v", destPath, err) |
| 1018 | } |
| 1019 | fs.Debugf(cacheFs, "reconciled temporary upload: %v", destPath) |
| 1020 | } |
| 1021 | |
| 1022 | return nil |
| 1023 | }) |
| 1024 | } |
| 1025 | |
| 1026 | // Close should be called when the program ends gracefully |
| 1027 | func (b *Persistent) Close() { |