| 47 | } |
| 48 | |
| 49 | func (p *ConditionalProcessor) AfterBucketPour(f *BucketFactory, msg pipeline.Event, l *Leaky) *pipeline.Event { |
| 50 | var condition, ok bool |
| 51 | |
| 52 | if p.ConditionalFilterRuntime != nil { |
| 53 | l.logger.Debugf("Running condition expression : %s", p.ConditionalFilter) |
| 54 | |
| 55 | ret, err := exprhelpers.Run(p.ConditionalFilterRuntime, |
| 56 | map[string]any{"evt": &msg, "queue": l.Queue, "leaky": l}, |
| 57 | l.logger, f.Spec.Debug) |
| 58 | if err != nil { |
| 59 | l.logger.Errorf("unable to run conditional filter : %s", err) |
| 60 | return &msg |
| 61 | } |
| 62 | |
| 63 | l.logger.Debugf("Conditional bucket expression returned : %v", ret) |
| 64 | |
| 65 | if condition, ok = ret.(bool); !ok { |
| 66 | l.logger.Warningf("overflow condition, unexpected non-bool return : %T", ret) |
| 67 | return &msg |
| 68 | } |
| 69 | |
| 70 | if condition { |
| 71 | l.logger.Debugf("Conditional bucket overflow") |
| 72 | l.Ovflw_ts = l.Last_ts |
| 73 | l.Out <- l.Queue |
| 74 | return nil |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | return &msg |
| 79 | } |