(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor, metadataComp compression.Name, splitterName string)
| 235 | } |
| 236 | |
| 237 | func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor, metadataComp compression.Name, splitterName string) (*snapshot.DirEntry, error) { |
| 238 | file, err := f.Open(ctx) |
| 239 | if err != nil { |
| 240 | return nil, errors.Wrap(err, "unable to open file") |
| 241 | } |
| 242 | defer file.Close() //nolint:errcheck |
| 243 | |
| 244 | writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ |
| 245 | Description: "FILE:" + fname, |
| 246 | Compressor: compressor, |
| 247 | MetadataCompressor: metadataComp, |
| 248 | Splitter: splitterName, |
| 249 | AsyncWrites: 1, // upload chunk in parallel to writing another chunk |
| 250 | }) |
| 251 | defer writer.Close() //nolint:errcheck |
| 252 | |
| 253 | parentCheckpointRegistry.addCheckpointCallback(fname, func() (*snapshot.DirEntry, error) { |
| 254 | checkpointID, err := writer.Checkpoint() |
| 255 | if err != nil { |
| 256 | return nil, errors.Wrap(err, "checkpoint error") |
| 257 | } |
| 258 | |
| 259 | if checkpointID == object.EmptyID { |
| 260 | return nil, nil |
| 261 | } |
| 262 | |
| 263 | return newDirEntry(f, fname, checkpointID) |
| 264 | }) |
| 265 | |
| 266 | defer parentCheckpointRegistry.removeCheckpointCallback(fname) |
| 267 | |
| 268 | if offset != 0 { |
| 269 | if _, serr := file.Seek(offset, io.SeekStart); serr != nil { |
| 270 | return nil, errors.Wrap(serr, "seek error") |
| 271 | } |
| 272 | } |
| 273 | |
| 274 | var s io.Reader = file |
| 275 | if length >= 0 { |
| 276 | s = io.LimitReader(s, length) |
| 277 | } |
| 278 | |
| 279 | written, err := u.copyWithProgress(writer, s) |
| 280 | if err != nil { |
| 281 | return nil, err |
| 282 | } |
| 283 | |
| 284 | r, err := writer.Result() |
| 285 | if err != nil { |
| 286 | return nil, errors.Wrap(err, "unable to get result") |
| 287 | } |
| 288 | |
| 289 | de, err := newDirEntry(f, fname, r) |
| 290 | if err != nil { |
| 291 | return nil, errors.Wrap(err, "unable to create dir entry") |
| 292 | } |
| 293 | |
| 294 | de.FileSize = written |
no test coverage detected