MCPcopy
hub / github.com/rclone/rclone / multiThreadCopy

Function multiThreadCopy

fs/operations/multithread.go:124–289  ·  view source on GitHub ↗

Copy src to (f, remote) using streams download threads. It tries to use the OpenChunkWriter feature and if that's not available it creates an adapter using OpenWriterAt

(ctx context.Context, f fs.Fs, remote string, src fs.Object, concurrency int, tr *accounting.Transfer, options ...fs.OpenOption)

Source from the content-addressed store, hash-verified

122// Copy src to (f, remote) using streams download threads. It tries to use the OpenChunkWriter feature
123// and if that's not available it creates an adapter using OpenWriterAt
124func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, concurrency int, tr *accounting.Transfer, options ...fs.OpenOption) (newDst fs.Object, err error) {
125 openChunkWriter := f.Features().OpenChunkWriter
126 ci := fs.GetConfig(ctx)
127 noBuffering := false
128 usingOpenWriterAt := false
129 if openChunkWriter == nil {
130 openWriterAt := f.Features().OpenWriterAt
131 if openWriterAt == nil {
132 return nil, errors.New("multi-thread copy: neither OpenChunkWriter nor OpenWriterAt supported")
133 }
134 openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, int64(ci.MultiThreadChunkSize), int64(ci.MultiThreadWriteBufferSize), f)
135 // If we are using OpenWriterAt we don't seek the chunks so don't need to buffer
136 fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt")
137 noBuffering = true
138 usingOpenWriterAt = true
139 } else if src.Fs().Features().IsLocal {
140 // If the source fs is local we don't need to buffer
141 fs.Debugf(src, "multi-thread copy: disabling buffering because source is local disk")
142 noBuffering = true
143 } else if f.Features().ChunkWriterDoesntSeek {
144 // If the destination Fs promises not to seek its chunks
145 // (except for retries) then we don't need buffering.
146 fs.Debugf(src, "multi-thread copy: disabling buffering because destination has set ChunkWriterDoesntSeek")
147 noBuffering = true
148 }
149
150 if src.Size() < 0 {
151 return nil, fmt.Errorf("multi-thread copy: can't copy unknown sized file")
152 }
153 if src.Size() == 0 {
154 return nil, fmt.Errorf("multi-thread copy: can't copy zero sized file")
155 }
156
157 info, chunkWriter, err := openChunkWriter(ctx, remote, src, options...)
158 if err != nil {
159 return nil, fmt.Errorf("multi-thread copy: failed to open chunk writer: %w", err)
160 }
161
162 uploadCtx, cancel := context.WithCancel(ctx)
163 defer cancel()
164 uploadedOK := false
165 defer atexit.OnError(&err, func() {
166 cancel()
167 if info.LeavePartsOnError || uploadedOK {
168 return
169 }
170 fs.Debugf(src, "multi-thread copy: cancelling transfer on exit")
171 abortErr := chunkWriter.Abort(ctx)
172 if abortErr != nil {
173 fs.Debugf(src, "multi-thread copy: abort failed: %v", abortErr)
174 }
175 })()
176
177 if info.ChunkSize > src.Size() {
178 fs.Debugf(src, "multi-thread copy: chunk size %v was bigger than source file size %v", fs.SizeSuffix(info.ChunkSize), fs.SizeSuffix(src.Size()))
179 info.ChunkSize = src.Size()
180 }
181

Callers 3

multiThreadCopyMethod · 0.85
TestMultithreadCopyFunction · 0.85
TestMultithreadCopyAbortFunction · 0.85

Calls 15

copyChunkMethod · 0.95
SetModTimeMethod · 0.95
GetConfigFunction · 0.92
DebugfFunction · 0.92
OnErrorFunction · 0.92
SizeSuffixTypeAlias · 0.92
NewRWFunction · 0.92
GetMetadataOptionsFunction · 0.92
ErrorfFunction · 0.92
calculateNumChunksFunction · 0.85
AccountMethod · 0.80

Tested by 2

TestMultithreadCopyFunction · 0.68
TestMultithreadCopyAbortFunction · 0.68

Used in the wild real call sites across dependent graphs

searching dependent graphs…