MCPcopy
hub / github.com/crowdsecurity/crowdsec / ReadFromShard

Method ReadFromShard

pkg/acquisition/modules/kinesis/run.go:330–387  ·  view source on GitHub ↗
(ctx context.Context, out chan pipeline.Event, shardID string)

Source from the content-addressed store, hash-verified

328}
329
330func (s *Source) ReadFromShard(ctx context.Context, out chan pipeline.Event, shardID string) error {
331 logger := s.logger.WithField("shard", shardID)
332 logger.Debugf("Starting to read shard")
333
334 sharIt, err := s.kClient.GetShardIterator(ctx,
335 &kinesis.GetShardIteratorInput{
336 ShardId: aws.String(shardID),
337 StreamName: &s.Config.StreamName,
338 ShardIteratorType: kinTypes.ShardIteratorTypeLatest,
339 })
340 if err != nil {
341 logger.Errorf("Cannot get shard iterator: %s", err)
342 return fmt.Errorf("cannot get shard iterator: %w", err)
343 }
344
345 it := sharIt.ShardIterator
346 // AWS recommends to wait for a second between calls to GetRecords for a given shard
347 ticker := time.NewTicker(time.Second)
348
349 for {
350 select {
351 case <-ticker.C:
352 records, err := s.kClient.GetRecords(ctx, &kinesis.GetRecordsInput{ShardIterator: it})
353
354 var throughputErr *kinTypes.ProvisionedThroughputExceededException
355 if errors.As(err, &throughputErr) {
356 logger.Warn("Provisioned throughput exceeded")
357 // TODO: implement exponential backoff
358 continue
359 }
360
361 var expiredIteratorErr *kinTypes.ExpiredIteratorException
362 if errors.As(err, &expiredIteratorErr) {
363 logger.Warn("Expired iterator")
364 continue
365 }
366
367 if err != nil {
368 logger.Error("Cannot get records")
369 return fmt.Errorf("cannot get records: %w", err)
370 }
371
372 it = records.NextShardIterator
373
374 s.ParseAndPushRecords(records.Records, out, logger, shardID)
375
376 if it == nil {
377 logger.Warnf("Shard has been closed")
378 return nil
379 }
380 case <-s.shardReaderTomb.Dying():
381 logger.Infof("shardReaderTomb is dying, exiting ReadFromShard")
382 ticker.Stop()
383
384 return nil
385 }
386 }
387}

Callers 1

ReadFromStreamMethod · 0.95

Calls 5

ParseAndPushRecordsMethod · 0.95
StopMethod · 0.80
ErrorMethod · 0.65
StringMethod · 0.45
WarnMethod · 0.45

Tested by

no test coverage detected