(ctx context.Context)
| 212 | } |
| 213 | |
| 214 | func (w *objectChunkWriter) Close(ctx context.Context) (err error) { |
| 215 | req := objectstorage.CommitMultipartUploadRequest{ |
| 216 | NamespaceName: common.String(w.f.opt.Namespace), |
| 217 | BucketName: w.bucket, |
| 218 | ObjectName: w.key, |
| 219 | UploadId: w.uploadID, |
| 220 | } |
| 221 | req.PartsToCommit = w.partsToCommit |
| 222 | var resp objectstorage.CommitMultipartUploadResponse |
| 223 | err = w.f.pacer.Call(func() (bool, error) { |
| 224 | resp, err = w.f.srv.CommitMultipartUpload(ctx, req) |
| 225 | // if multipart is corrupted, we will abort the uploadId |
| 226 | if isMultiPartUploadCorrupted(err) { |
| 227 | fs.Debugf(w.o, "multipart uploadId %v is corrupted, aborting...", *w.uploadID) |
| 228 | _ = w.Abort(ctx) |
| 229 | return false, err |
| 230 | } |
| 231 | return shouldRetry(ctx, resp.HTTPResponse(), err) |
| 232 | }) |
| 233 | if err != nil { |
| 234 | return err |
| 235 | } |
| 236 | w.eTag = *resp.ETag |
| 237 | hashOfHashes := md5.Sum(w.md5s) |
| 238 | wantMultipartMd5 := fmt.Sprintf("%s-%d", base64.StdEncoding.EncodeToString(hashOfHashes[:]), len(w.partsToCommit)) |
| 239 | gotMultipartMd5 := *resp.OpcMultipartMd5 |
| 240 | if wantMultipartMd5 != gotMultipartMd5 { |
| 241 | fs.Errorf(w.o, "multipart upload corrupted: multipart md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5) |
| 242 | return fmt.Errorf("multipart upload corrupted: md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5) |
| 243 | } |
| 244 | fs.Debugf(w.o, "multipart upload %v md5 matched: expecting %s and got %s", *w.uploadID, wantMultipartMd5, gotMultipartMd5) |
| 245 | return nil |
| 246 | } |
| 247 | |
| 248 | func isMultiPartUploadCorrupted(err error) bool { |
| 249 | if err == nil { |
nothing calls this directly
no test coverage detected