WriteChunk will write chunk number with reader bytes, where chunk number >= 0
(ctx context.Context, chunkNumber int, reader io.ReadSeeker)
| 128 | |
| 129 | // WriteChunk will write chunk number with reader bytes, where chunk number >= 0 |
| 130 | func (w *objectChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) { |
| 131 | if chunkNumber < 0 { |
| 132 | err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber) |
| 133 | return -1, err |
| 134 | } |
| 135 | // Only account after the checksum reads have been done |
| 136 | if do, ok := reader.(pool.DelayAccountinger); ok { |
| 137 | // To figure out this number, do a transfer and if the accounted size is 0 or a |
| 138 | // multiple of what it should be, increase or decrease this number. |
| 139 | do.DelayAccounting(2) |
| 140 | } |
| 141 | m := md5.New() |
| 142 | currentChunkSize, err := io.Copy(m, reader) |
| 143 | if err != nil { |
| 144 | return -1, err |
| 145 | } |
| 146 | // If no data read, don't write the chunk |
| 147 | if currentChunkSize == 0 { |
| 148 | return 0, nil |
| 149 | } |
| 150 | md5sumBinary := m.Sum([]byte{}) |
| 151 | w.addMd5(&md5sumBinary, int64(chunkNumber)) |
| 152 | md5sum := base64.StdEncoding.EncodeToString(md5sumBinary) |
| 153 | |
| 154 | // Object storage requires 1 <= PartNumber <= 10000 |
| 155 | ossPartNumber := chunkNumber + 1 |
| 156 | if existing, ok := w.existingParts[ossPartNumber]; ok { |
| 157 | if md5sum == *existing.Md5 { |
| 158 | fs.Debugf(w.o, "matched uploaded part found, part num %d, skipping part, md5=%v", *existing.PartNumber, md5sum) |
| 159 | w.addCompletedPart(existing.PartNumber, existing.Etag) |
| 160 | return currentChunkSize, nil |
| 161 | } |
| 162 | } |
| 163 | req := objectstorage.UploadPartRequest{ |
| 164 | NamespaceName: common.String(w.f.opt.Namespace), |
| 165 | BucketName: w.bucket, |
| 166 | ObjectName: w.key, |
| 167 | UploadId: w.uploadID, |
| 168 | UploadPartNum: common.Int(ossPartNumber), |
| 169 | ContentLength: common.Int64(currentChunkSize), |
| 170 | ContentMD5: common.String(md5sum), |
| 171 | } |
| 172 | w.o.applyPartUploadOptions(w.ui.req, &req) |
| 173 | var resp objectstorage.UploadPartResponse |
| 174 | err = w.f.pacer.Call(func() (bool, error) { |
| 175 | // req.UploadPartBody = io.NopCloser(bytes.NewReader(buf)) |
| 176 | // rewind the reader on retry and after reading md5 |
| 177 | _, err = reader.Seek(0, io.SeekStart) |
| 178 | if err != nil { |
| 179 | return false, err |
| 180 | } |
| 181 | req.UploadPartBody = io.NopCloser(reader) |
| 182 | resp, err = w.f.srv.UploadPart(ctx, req) |
| 183 | if err != nil { |
| 184 | if ossPartNumber <= 8 { |
| 185 | return shouldRetry(ctx, resp.HTTPResponse(), err) |
| 186 | } |
| 187 | if fserrors.ContextError(ctx, &err) { |
nothing calls this directly
no test coverage detected