| 336 | } |
| 337 | |
| 338 | func (s *Source) readFile(bucket string, key string) error { |
| 339 | // TODO: Handle SSE-C |
| 340 | var scanner *bufio.Scanner |
| 341 | |
| 342 | logger := s.logger.WithFields(log.Fields{ |
| 343 | "method": "readFile", |
| 344 | "bucket": bucket, |
| 345 | "key": key, |
| 346 | }) |
| 347 | |
| 348 | output, err := s.s3Client.GetObject(s.ctx, &s3.GetObjectInput{ |
| 349 | Bucket: aws.String(bucket), |
| 350 | Key: aws.String(key), |
| 351 | }) |
| 352 | if err != nil { |
| 353 | return fmt.Errorf("failed to get object %s/%s: %w", bucket, key, err) |
| 354 | } |
| 355 | defer output.Body.Close() |
| 356 | |
| 357 | if strings.HasSuffix(key, ".gz") { |
| 358 | // This *might* be a gzipped file, but sometimes the SDK will decompress the data for us (it's not clear when it happens, only had the issue with cloudtrail logs) |
| 359 | header := make([]byte, 2) |
| 360 | |
| 361 | _, err := output.Body.Read(header) |
| 362 | if err != nil { |
| 363 | return fmt.Errorf("failed to read header of object %s/%s: %w", bucket, key, err) |
| 364 | } |
| 365 | |
| 366 | if header[0] == 0x1f && header[1] == 0x8b { |
| 367 | gz, err := gzip.NewReader(io.MultiReader(bytes.NewReader(header), output.Body)) |
| 368 | if err != nil { |
| 369 | return fmt.Errorf("failed to create gzip reader for object %s/%s: %w", bucket, key, err) |
| 370 | } |
| 371 | scanner = bufio.NewScanner(gz) |
| 372 | } else { |
| 373 | scanner = bufio.NewScanner(io.MultiReader(bytes.NewReader(header), output.Body)) |
| 374 | } |
| 375 | } else { |
| 376 | scanner = bufio.NewScanner(output.Body) |
| 377 | } |
| 378 | |
| 379 | if s.Config.MaxBufferSize > 0 { |
| 380 | s.logger.Infof("Setting max buffer size to %d", s.Config.MaxBufferSize) |
| 381 | |
| 382 | buf := make([]byte, 0, bufio.MaxScanTokenSize) |
| 383 | scanner.Buffer(buf, s.Config.MaxBufferSize) |
| 384 | } |
| 385 | |
| 386 | for scanner.Scan() { |
| 387 | select { |
| 388 | case <-s.t.Dying(): |
| 389 | s.logger.Infof("Shutting down reader for %s/%s", bucket, key) |
| 390 | return nil |
| 391 | default: |
| 392 | text := scanner.Text() |
| 393 | logger.Tracef("Read line %s", text) |
| 394 | |
| 395 | if s.metricsLevel != metrics.AcquisitionMetricsLevelNone { |