MCPcopy Index your code
hub / github.com/rclone/rclone / WriteChunk

Method WriteChunk

backend/oracleobjectstorage/multipart.go:130–202  ·  view source on GitHub ↗

WriteChunk will write chunk number with reader bytes, where chunk number >= 0

(ctx context.Context, chunkNumber int, reader io.ReadSeeker)

Source from the content-addressed store, hash-verified

128
129// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
130func (w *objectChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) {
131 if chunkNumber < 0 {
132 err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber)
133 return -1, err
134 }
135 // Only account after the checksum reads have been done
136 if do, ok := reader.(pool.DelayAccountinger); ok {
137 // To figure out this number, do a transfer and if the accounted size is 0 or a
138 // multiple of what it should be, increase or decrease this number.
139 do.DelayAccounting(2)
140 }
141 m := md5.New()
142 currentChunkSize, err := io.Copy(m, reader)
143 if err != nil {
144 return -1, err
145 }
146 // If no data read, don't write the chunk
147 if currentChunkSize == 0 {
148 return 0, nil
149 }
150 md5sumBinary := m.Sum([]byte{})
151 w.addMd5(&md5sumBinary, int64(chunkNumber))
152 md5sum := base64.StdEncoding.EncodeToString(md5sumBinary)
153
154 // Object storage requires 1 <= PartNumber <= 10000
155 ossPartNumber := chunkNumber + 1
156 if existing, ok := w.existingParts[ossPartNumber]; ok {
157 if md5sum == *existing.Md5 {
158 fs.Debugf(w.o, "matched uploaded part found, part num %d, skipping part, md5=%v", *existing.PartNumber, md5sum)
159 w.addCompletedPart(existing.PartNumber, existing.Etag)
160 return currentChunkSize, nil
161 }
162 }
163 req := objectstorage.UploadPartRequest{
164 NamespaceName: common.String(w.f.opt.Namespace),
165 BucketName: w.bucket,
166 ObjectName: w.key,
167 UploadId: w.uploadID,
168 UploadPartNum: common.Int(ossPartNumber),
169 ContentLength: common.Int64(currentChunkSize),
170 ContentMD5: common.String(md5sum),
171 }
172 w.o.applyPartUploadOptions(w.ui.req, &req)
173 var resp objectstorage.UploadPartResponse
174 err = w.f.pacer.Call(func() (bool, error) {
175 // req.UploadPartBody = io.NopCloser(bytes.NewReader(buf))
176 // rewind the reader on retry and after reading md5
177 _, err = reader.Seek(0, io.SeekStart)
178 if err != nil {
179 return false, err
180 }
181 req.UploadPartBody = io.NopCloser(reader)
182 resp, err = w.f.srv.UploadPart(ctx, req)
183 if err != nil {
184 if ossPartNumber <= 8 {
185 return shouldRetry(ctx, resp.HTTPResponse(), err)
186 }
187 if fserrors.ContextError(ctx, &err) {

Callers

nothing calls this directly

Calls 15

addMd5Method · 0.95
addCompletedPartMethod · 0.95
DebugfFunction · 0.92
ContextErrorFunction · 0.92
ErrorfFunction · 0.92
shouldRetryFunction · 0.70
DelayAccountingMethod · 0.65
CopyMethod · 0.65
EncodeToStringMethod · 0.65
StringMethod · 0.65
SeekMethod · 0.65

Tested by

no test coverage detected