( ctx context.Context, source types.DataSource, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb, )
| 579 | } |
| 580 | |
| 581 | func acquireSource( |
| 582 | ctx context.Context, |
| 583 | source types.DataSource, |
| 584 | name string, |
| 585 | output chan pipeline.Event, |
| 586 | acquisTomb *tomb.Tomb, |
| 587 | ) error { |
| 588 | if source.GetMode() == configuration.CAT_MODE { |
| 589 | if s, ok := source.(types.BatchFetcher); ok { |
| 590 | // s.Logger.Info("Start OneShot") |
| 591 | return runBatchFetcher(ctx, s, output, acquisTomb) |
| 592 | } |
| 593 | |
| 594 | if s, ok := source.(types.Fetcher); ok { |
| 595 | // s.Logger.Info("Start OneShotAcquisition") |
| 596 | return s.OneShotAcquisition(ctx, output, acquisTomb) |
| 597 | } |
| 598 | |
| 599 | return fmt.Errorf("%s: cat mode is set but OneShotAcquisition is not supported", source.GetName()) |
| 600 | } |
| 601 | |
| 602 | if s, ok := source.(types.Tailer); ok { |
| 603 | // s.Logger.Info("Streaming Acquisition") |
| 604 | return s.StreamingAcquisition(ctx, output, acquisTomb) |
| 605 | } |
| 606 | |
| 607 | if s, ok := source.(types.RestartableStreamer); ok { |
| 608 | return runRestartableStream(ctx, s, name, output, acquisTomb) |
| 609 | } |
| 610 | |
| 611 | return fmt.Errorf("%s: tail mode is set but the datasource does not support streaming acquisition", source.GetName()) |
| 612 | } |
| 613 | |
| 614 | func StartAcquisition( |
| 615 | ctx context.Context, |
no test coverage detected
searching dependent graphs…