MCPcopy
hub / github.com/crowdsecurity/crowdsec / ReadFromStream

Method ReadFromStream

pkg/acquisition/modules/kinesis/run.go:389–431  ·  view source on GitHub ↗
(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb)

Source from the content-addressed store, hash-verified

387}
388
389func (s *Source) ReadFromStream(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
390 s.logger = s.logger.WithField("stream", s.Config.StreamName)
391 s.logger.Info("starting kinesis acquisition from shards")
392
393 for {
394 shards, err := s.kClient.ListShards(ctx, &kinesis.ListShardsInput{
395 StreamName: aws.String(s.Config.StreamName),
396 })
397 if err != nil {
398 return fmt.Errorf("cannot list shards: %w", err)
399 }
400
401 s.shardReaderTomb = &tomb.Tomb{}
402
403 for _, shard := range shards.Shards {
404 shardID := *shard.ShardId
405
406 s.shardReaderTomb.Go(func() error {
407 defer trace.ReportPanic()
408 return s.ReadFromShard(ctx, out, shardID)
409 })
410 }
411
412 select {
413 case <-t.Dying():
414 s.logger.Info("kinesis source is dying")
415 s.shardReaderTomb.Kill(nil)
416 _ = s.shardReaderTomb.Wait() // we don't care about the error as we kill the tomb ourselves
417
418 return nil
419 case <-s.shardReaderTomb.Dying():
420 reason := s.shardReaderTomb.Err()
421 if reason != nil {
422 s.logger.Errorf("Unexpected error from shard reader : %s", reason)
423 return reason
424 }
425
426 s.logger.Infof("All shards have been closed, probably a resharding event, restarting acquisition")
427
428 continue
429 }
430 }
431}
432
433func (s *Source) StreamingAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
434 t.Go(func() error {

Callers 1

StreamingAcquisitionMethod · 0.95

Calls 5

ReadFromShardMethod · 0.95
KillMethod · 0.80
ErrMethod · 0.80
InfoMethod · 0.45
StringMethod · 0.45

Tested by

no test coverage detected