Copy src to (f, remote) using streams download threads. It tries to use the OpenChunkWriter feature and if that's not available it creates an adapter using OpenWriterAt
(ctx context.Context, f fs.Fs, remote string, src fs.Object, concurrency int, tr *accounting.Transfer, options ...fs.OpenOption)
| 122 | // Copy src to (f, remote) using streams download threads. It tries to use the OpenChunkWriter feature |
| 123 | // and if that's not available it creates an adapter using OpenWriterAt |
| 124 | func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, concurrency int, tr *accounting.Transfer, options ...fs.OpenOption) (newDst fs.Object, err error) { |
| 125 | openChunkWriter := f.Features().OpenChunkWriter |
| 126 | ci := fs.GetConfig(ctx) |
| 127 | noBuffering := false |
| 128 | usingOpenWriterAt := false |
| 129 | if openChunkWriter == nil { |
| 130 | openWriterAt := f.Features().OpenWriterAt |
| 131 | if openWriterAt == nil { |
| 132 | return nil, errors.New("multi-thread copy: neither OpenChunkWriter nor OpenWriterAt supported") |
| 133 | } |
| 134 | openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, int64(ci.MultiThreadChunkSize), int64(ci.MultiThreadWriteBufferSize), f) |
| 135 | // If we are using OpenWriterAt we don't seek the chunks so don't need to buffer |
| 136 | fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt") |
| 137 | noBuffering = true |
| 138 | usingOpenWriterAt = true |
| 139 | } else if src.Fs().Features().IsLocal { |
| 140 | // If the source fs is local we don't need to buffer |
| 141 | fs.Debugf(src, "multi-thread copy: disabling buffering because source is local disk") |
| 142 | noBuffering = true |
| 143 | } else if f.Features().ChunkWriterDoesntSeek { |
| 144 | // If the destination Fs promises not to seek its chunks |
| 145 | // (except for retries) then we don't need buffering. |
| 146 | fs.Debugf(src, "multi-thread copy: disabling buffering because destination has set ChunkWriterDoesntSeek") |
| 147 | noBuffering = true |
| 148 | } |
| 149 | |
| 150 | if src.Size() < 0 { |
| 151 | return nil, fmt.Errorf("multi-thread copy: can't copy unknown sized file") |
| 152 | } |
| 153 | if src.Size() == 0 { |
| 154 | return nil, fmt.Errorf("multi-thread copy: can't copy zero sized file") |
| 155 | } |
| 156 | |
| 157 | info, chunkWriter, err := openChunkWriter(ctx, remote, src, options...) |
| 158 | if err != nil { |
| 159 | return nil, fmt.Errorf("multi-thread copy: failed to open chunk writer: %w", err) |
| 160 | } |
| 161 | |
| 162 | uploadCtx, cancel := context.WithCancel(ctx) |
| 163 | defer cancel() |
| 164 | uploadedOK := false |
| 165 | defer atexit.OnError(&err, func() { |
| 166 | cancel() |
| 167 | if info.LeavePartsOnError || uploadedOK { |
| 168 | return |
| 169 | } |
| 170 | fs.Debugf(src, "multi-thread copy: cancelling transfer on exit") |
| 171 | abortErr := chunkWriter.Abort(ctx) |
| 172 | if abortErr != nil { |
| 173 | fs.Debugf(src, "multi-thread copy: abort failed: %v", abortErr) |
| 174 | } |
| 175 | })() |
| 176 | |
| 177 | if info.ChunkSize > src.Size() { |
| 178 | fs.Debugf(src, "multi-thread copy: chunk size %v was bigger than source file size %v", fs.SizeSuffix(info.ChunkSize), fs.SizeSuffix(src.Size())) |
| 179 | info.ChunkSize = src.Size() |
| 180 | } |
| 181 |
searching dependent graphs…