(ctx context.Context, cfg *LogStreamTailConfig, outChan chan pipeline.Event)
| 376 | } |
| 377 | |
| 378 | func (s *Source) CatLogStream(ctx context.Context, cfg *LogStreamTailConfig, outChan chan pipeline.Event) error { |
| 379 | var startFrom *string |
| 380 | |
| 381 | head := true |
| 382 | // convert the times |
| 383 | startTime := cfg.StartTime.UTC().Unix() * 1000 |
| 384 | endTime := cfg.EndTime.UTC().Unix() * 1000 |
| 385 | |
| 386 | hasMoreEvents := true |
| 387 | for hasMoreEvents { |
| 388 | select { |
| 389 | default: |
| 390 | cfg.logger.Tracef("Calling GetLogEventsPagesWithContext(%s, %s), startTime:%d / endTime:%d", |
| 391 | cfg.GroupName, cfg.StreamName, startTime, endTime) |
| 392 | cfg.logger.Tracef("startTime:%s / endTime:%s", cfg.StartTime, cfg.EndTime) |
| 393 | |
| 394 | if startFrom != nil { |
| 395 | cfg.logger.Tracef("next_token: %s", *startFrom) |
| 396 | } |
| 397 | |
| 398 | p := cloudwatchlogs.NewGetLogEventsPaginator( |
| 399 | s.cwClient, |
| 400 | &cloudwatchlogs.GetLogEventsInput{ |
| 401 | Limit: aws.Int32(10), |
| 402 | LogGroupName: aws.String(cfg.GroupName), |
| 403 | LogStreamName: aws.String(cfg.StreamName), |
| 404 | StartTime: aws.Int64(startTime), |
| 405 | EndTime: aws.Int64(endTime), |
| 406 | StartFromHead: &head, |
| 407 | NextToken: startFrom, |
| 408 | }, |
| 409 | ) |
| 410 | for p.HasMorePages() { |
| 411 | page, err := p.NextPage(ctx) |
| 412 | if err != nil { |
| 413 | return fmt.Errorf("while reading logs from %s/%s: %w", cfg.GroupName, cfg.StreamName, err) |
| 414 | } |
| 415 | |
| 416 | for _, e := range page.Events { |
| 417 | evt, err := cwLogToEvent(e, cfg) |
| 418 | if err != nil { |
| 419 | cfg.logger.Warningf("discard event: %s", err) |
| 420 | } |
| 421 | |
| 422 | cfg.logger.Debugf("pushing message: %s", evt.Line.Raw) |
| 423 | |
| 424 | outChan <- evt |
| 425 | } |
| 426 | |
| 427 | if startFrom != nil && page.NextForwardToken != nil && *page.NextForwardToken == *startFrom { |
| 428 | cfg.logger.Debugf("reached end of available events") |
| 429 | hasMoreEvents = false |
| 430 | break |
| 431 | } |
| 432 | |
| 433 | startFrom = page.NextForwardToken |
| 434 | } |
| 435 |
no test coverage detected