Execute runs the flow once with the given input data. This is called automatically on each broker event, but can also be invoked directly for testing or one-shot use.
(ctx context.Context, data string)
| 214 | // called automatically on each broker event, but can also be |
| 215 | // invoked directly for testing or one-shot use. |
| 216 | func (f *Flow) Execute(ctx context.Context, data string) error { |
| 217 | ctx, cancel := f.withTimeout(ctx) |
| 218 | defer cancel() |
| 219 | |
| 220 | // Stepped flows run the ordered, checkpointed step loop. |
| 221 | if len(f.opts.Steps) > 0 { |
| 222 | _, err := f.startRun(ctx, data) |
| 223 | return err |
| 224 | } |
| 225 | |
| 226 | runID := uuid.New().String() |
| 227 | info, _ := ai.RunInfoFrom(ctx) |
| 228 | info.RunID = runID |
| 229 | info.Flow = f.name |
| 230 | ctx = ai.WithRunInfo(ctx, info) |
| 231 | |
| 232 | start := time.Now() |
| 233 | |
| 234 | prompt := data |
| 235 | if f.tmpl != nil { |
| 236 | var buf bytes.Buffer |
| 237 | _ = f.tmpl.Execute(&buf, map[string]string{"Data": data}) |
| 238 | prompt = buf.String() |
| 239 | } |
| 240 | |
| 241 | result := Result{ |
| 242 | FlowName: f.name, |
| 243 | Trigger: f.opts.TriggerTopic, |
| 244 | Prompt: prompt, |
| 245 | Timestamp: start, |
| 246 | } |
| 247 | |
| 248 | // Flow triggers, Agent reasons: hand the event to the named agent. |
| 249 | if f.opts.Agent != "" { |
| 250 | reply, err := f.callAgent(ctx, f.opts.Agent, prompt) |
| 251 | result.Duration = time.Since(start).Seconds() |
| 252 | if err != nil { |
| 253 | result.Error = err.Error() |
| 254 | result.ErrorKind = string(ai.ClassifyError(err)) |
| 255 | f.record(result) |
| 256 | return err |
| 257 | } |
| 258 | result.Reply = reply |
| 259 | f.record(result) |
| 260 | f.log.Logf(logger.InfoLevel, "Flow %s dispatched to agent %s in %.1fs", |
| 261 | f.name, f.opts.Agent, result.Duration) |
| 262 | return nil |
| 263 | } |
| 264 | |
| 265 | // Otherwise run a single augmented-LLM step with the services as tools. |
| 266 | discovered, err := f.toolSet.Discover() |
| 267 | if err != nil { |
| 268 | result.Duration = time.Since(start).Seconds() |
| 269 | result.Error = err.Error() |
| 270 | result.ErrorKind = string(ai.ClassifyError(err)) |
| 271 | f.record(result) |
| 272 | return fmt.Errorf("discover tools: %w", err) |
| 273 | } |