(ctx UnixParserCtx, event pipeline.Event, nodes []Node, collector *StageParseCollector)
| 231 | } |
| 232 | |
| 233 | func Parse(ctx UnixParserCtx, event pipeline.Event, nodes []Node, collector *StageParseCollector) (pipeline.Event, error) { |
| 234 | /* the stage is undefined, probably line is freshly acquired, set to first stage !*/ |
| 235 | if event.Stage == "" && len(ctx.Stages) > 0 { |
| 236 | event.Stage = ctx.Stages[0] |
| 237 | log.Tracef("no stage, set to : %s", event.Stage) |
| 238 | } |
| 239 | |
| 240 | event.Process = false |
| 241 | if event.Time.IsZero() { |
| 242 | event.Time = time.Now().UTC() |
| 243 | } |
| 244 | |
| 245 | if event.Parsed == nil { |
| 246 | event.Parsed = make(map[string]string) |
| 247 | } |
| 248 | |
| 249 | if event.Enriched == nil { |
| 250 | event.Enriched = make(map[string]string) |
| 251 | } |
| 252 | |
| 253 | if event.Meta == nil { |
| 254 | event.Meta = make(map[string]string) |
| 255 | } |
| 256 | |
| 257 | if event.Unmarshaled == nil { |
| 258 | event.Unmarshaled = make(map[string]any) |
| 259 | } |
| 260 | |
| 261 | if event.Type == pipeline.LOG { |
| 262 | log.Tracef("INPUT '%s'", event.Line.Raw) |
| 263 | } |
| 264 | |
| 265 | exprEnv := map[string]any{"evt": &event} |
| 266 | |
| 267 | for _, stage := range ctx.Stages { |
| 268 | /* if the node is forward in stages, seek to this stage */ |
| 269 | /* this is for example used by testing system to inject logs in post-syslog-parsing phase*/ |
| 270 | if slices.Index(ctx.Stages, event.Stage) > slices.Index(ctx.Stages, stage) { |
| 271 | log.Tracef("skipping stage, we are already at [%s] expecting [%s]", event.Stage, stage) |
| 272 | continue |
| 273 | } |
| 274 | |
| 275 | log.Tracef("node stage : %s, current stage : %s", event.Stage, stage) |
| 276 | |
| 277 | /* if the stage is wrong, it means that the log didn't manage "pass" a stage with a onsuccess: next_stage tag */ |
| 278 | if event.Stage != stage { |
| 279 | log.Debugf("Event not parsed, expected stage '%s' got '%s', abort", stage, event.Stage) |
| 280 | event.Process = false |
| 281 | |
| 282 | return event, nil |
| 283 | } |
| 284 | |
| 285 | isStageOK := false |
| 286 | |
| 287 | for idx := range nodes { |
| 288 | // Only process current stage's nodes |
| 289 | if event.Stage != nodes[idx].Stage { |
| 290 | continue |
searching dependent graphs…