( ctx context.Context, sources []types.DataSource, output chan pipeline.Event, acquisTomb *tomb.Tomb, )
| 612 | } |
| 613 | |
| 614 | func StartAcquisition( |
| 615 | ctx context.Context, |
| 616 | sources []types.DataSource, |
| 617 | output chan pipeline.Event, |
| 618 | acquisTomb *tomb.Tomb, |
| 619 | ) error { |
| 620 | // Don't wait if we have no sources, as it will hang forever |
| 621 | if len(sources) == 0 { |
| 622 | return nil |
| 623 | } |
| 624 | |
| 625 | for i := range sources { |
| 626 | subsrc := sources[i] // ensure it's a copy |
| 627 | log.Debugf("starting one source %d/%d ->> %T", i, len(sources), subsrc) |
| 628 | |
| 629 | acquisTomb.Go(func() error { |
| 630 | defer trace.ReportPanic() |
| 631 | |
| 632 | outChan := output |
| 633 | |
| 634 | log.Debugf("datasource %s UUID: %s", subsrc.GetName(), subsrc.GetUuid()) |
| 635 | |
| 636 | if transformRuntime, ok := transformRuntimes[subsrc.GetUuid()]; ok { |
| 637 | log.Infof("transform expression found for datasource %s", subsrc.GetName()) |
| 638 | |
| 639 | transformChan := make(chan pipeline.Event) |
| 640 | outChan = transformChan |
| 641 | transformLogger := log.WithFields(log.Fields{ |
| 642 | "component": "transform", |
| 643 | "datasource": subsrc.GetName(), |
| 644 | }) |
| 645 | |
| 646 | acquisTomb.Go(func() error { |
| 647 | defer trace.ReportPanic() |
| 648 | transform(outChan, output, acquisTomb, transformRuntime, transformLogger) |
| 649 | return nil |
| 650 | }) |
| 651 | } |
| 652 | |
| 653 | if err := acquireSource(ctx, subsrc, subsrc.GetName(), output, acquisTomb); err != nil { |
| 654 | // if one of the acquisitions returns an error, we kill the others to properly shutdown |
| 655 | acquisTomb.Kill(err) |
| 656 | } |
| 657 | |
| 658 | return nil |
| 659 | }) |
| 660 | } |
| 661 | |
| 662 | // return only when acquisition is over (cat) or never (tail) |
| 663 | err := acquisTomb.Wait() |
| 664 | |
| 665 | return err |
| 666 | } |
searching dependent graphs…