MCPcopy
hub / github.com/cortexlabs/cortex / S3FileIterator

Method S3FileIterator

pkg/lib/aws/s3.go:655–693  ·  view source on GitHub ↗
(bucket string, s3Obj *s3.Object, partSize int, fn func(buffer io.ReadCloser, isLastPart bool) (bool, error))

Source from the content-addressed store, hash-verified

653}
654
655func (c *Client) S3FileIterator(bucket string, s3Obj *s3.Object, partSize int, fn func(buffer io.ReadCloser, isLastPart bool) (bool, error)) error {
656 size := int(*s3Obj.Size)
657
658 iters := size / partSize
659 if size%partSize != 0 {
660 iters++
661 }
662
663 for i := 0; i < iters; i++ {
664 min := i * (partSize)
665 max := (i + 1) * (partSize)
666 if max > size {
667 max = size
668 }
669 max--
670
671 byteRange := fmt.Sprintf("bytes=%d-%d", min, max)
672 obj, err := c.S3().GetObject(&s3.GetObjectInput{
673 Bucket: aws.String(bucket),
674 Key: s3Obj.Key,
675 Range: aws.String(byteRange), // use range instead of part numbers because only files uploaded using multipart have parts
676 })
677 if err != nil {
678 return errors.Wrap(err, S3Path(bucket, *s3Obj.Key), "range "+byteRange)
679 }
680
681 isLastChunk := i+1 == iters
682 shouldContinue, err := fn(obj.Body, isLastChunk)
683 if err != nil {
684 return errors.Wrap(err, S3Path(bucket, *s3Obj.Key))
685 }
686
687 if !shouldContinue {
688 break
689 }
690 }
691
692 return nil
693}
694
695func (c *Client) ListS3Dir(bucket string, s3Dir string, includeDirObjects bool, maxResults *int64, startAfter *string) ([]*s3.Object, error) {
696 prefix := s.EnsureSuffix(s3Dir, "/")

Callers 1

enqueueS3FileContentsMethod · 0.80

Calls 4

S3Method · 0.95
WrapFunction · 0.92
S3PathFunction · 0.85
StringMethod · 0.45

Tested by

no test coverage detected