| 30 | ) |
| 31 | |
| 32 | func (p *CancelProcessor) OnBucketPour(_ *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event { |
| 33 | var condition, ok bool |
| 34 | if p.CancelOnFilter != nil { |
| 35 | leaky.logger.Tracef("running cancel_on filter") |
| 36 | output, err := exprhelpers.Run(p.CancelOnFilter, map[string]any{"evt": &msg}, leaky.logger, p.Debug) |
| 37 | if err != nil { |
| 38 | leaky.logger.Warningf("cancel_on error : %s", err) |
| 39 | return &msg |
| 40 | } |
| 41 | if condition, ok = output.(bool); !ok { |
| 42 | leaky.logger.Warningf("cancel_on, unexpected non-bool return : %T", output) |
| 43 | return &msg |
| 44 | } |
| 45 | if condition { |
| 46 | leaky.logger.Debugf("reset_filter matched, kill bucket") |
| 47 | leaky.Suicide <- true |
| 48 | return nil // counter intuitively, we need to keep the message so that it doesn't trigger an endless loop |
| 49 | } |
| 50 | leaky.logger.Debugf("reset_filter didn't match") |
| 51 | } |
| 52 | return &msg |
| 53 | } |
| 54 | |
| 55 | func (*CancelProcessor) OnBucketOverflow(_ *BucketFactory, _ *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) { |
| 56 | return alert, queue |