MCPcopy
hub / github.com/crowdsecurity/crowdsec / CatLogStream

Method CatLogStream

pkg/acquisition/modules/cloudwatch/run.go:378–446  ·  view source on GitHub ↗
(ctx context.Context, cfg *LogStreamTailConfig, outChan chan pipeline.Event)

Source from the content-addressed store, hash-verified

376}
377
378func (s *Source) CatLogStream(ctx context.Context, cfg *LogStreamTailConfig, outChan chan pipeline.Event) error {
379 var startFrom *string
380
381 head := true
382 // convert the times
383 startTime := cfg.StartTime.UTC().Unix() * 1000
384 endTime := cfg.EndTime.UTC().Unix() * 1000
385
386 hasMoreEvents := true
387 for hasMoreEvents {
388 select {
389 default:
390 cfg.logger.Tracef("Calling GetLogEventsPagesWithContext(%s, %s), startTime:%d / endTime:%d",
391 cfg.GroupName, cfg.StreamName, startTime, endTime)
392 cfg.logger.Tracef("startTime:%s / endTime:%s", cfg.StartTime, cfg.EndTime)
393
394 if startFrom != nil {
395 cfg.logger.Tracef("next_token: %s", *startFrom)
396 }
397
398 p := cloudwatchlogs.NewGetLogEventsPaginator(
399 s.cwClient,
400 &cloudwatchlogs.GetLogEventsInput{
401 Limit: aws.Int32(10),
402 LogGroupName: aws.String(cfg.GroupName),
403 LogStreamName: aws.String(cfg.StreamName),
404 StartTime: aws.Int64(startTime),
405 EndTime: aws.Int64(endTime),
406 StartFromHead: &head,
407 NextToken: startFrom,
408 },
409 )
410 for p.HasMorePages() {
411 page, err := p.NextPage(ctx)
412 if err != nil {
413 return fmt.Errorf("while reading logs from %s/%s: %w", cfg.GroupName, cfg.StreamName, err)
414 }
415
416 for _, e := range page.Events {
417 evt, err := cwLogToEvent(e, cfg)
418 if err != nil {
419 cfg.logger.Warningf("discard event: %s", err)
420 }
421
422 cfg.logger.Debugf("pushing message: %s", evt.Line.Raw)
423
424 outChan <- evt
425 }
426
427 if startFrom != nil && page.NextForwardToken != nil && *page.NextForwardToken == *startFrom {
428 cfg.logger.Debugf("reached end of available events")
429 hasMoreEvents = false
430 break
431 }
432
433 startFrom = page.NextForwardToken
434 }
435

Callers 1

OneShotAcquisitionMethod · 0.95

Calls 3

cwLogToEventFunction · 0.85
TracefMethod · 0.80
StringMethod · 0.45

Tested by

no test coverage detected