MCPcopy
hub / github.com/micro/go-micro / Execute

Method Execute

flow/flow.go:216–302  ·  view source on GitHub ↗

Execute runs the flow once with the given input data. This is called automatically on each broker event, but can also be invoked directly for testing or one-shot use.

(ctx context.Context, data string)

Source from the content-addressed store, hash-verified

214// called automatically on each broker event, but can also be
215// invoked directly for testing or one-shot use.
216func (f *Flow) Execute(ctx context.Context, data string) error {
217 ctx, cancel := f.withTimeout(ctx)
218 defer cancel()
219
220 // Stepped flows run the ordered, checkpointed step loop.
221 if len(f.opts.Steps) > 0 {
222 _, err := f.startRun(ctx, data)
223 return err
224 }
225
226 runID := uuid.New().String()
227 info, _ := ai.RunInfoFrom(ctx)
228 info.RunID = runID
229 info.Flow = f.name
230 ctx = ai.WithRunInfo(ctx, info)
231
232 start := time.Now()
233
234 prompt := data
235 if f.tmpl != nil {
236 var buf bytes.Buffer
237 _ = f.tmpl.Execute(&buf, map[string]string{"Data": data})
238 prompt = buf.String()
239 }
240
241 result := Result{
242 FlowName: f.name,
243 Trigger: f.opts.TriggerTopic,
244 Prompt: prompt,
245 Timestamp: start,
246 }
247
248 // Flow triggers, Agent reasons: hand the event to the named agent.
249 if f.opts.Agent != "" {
250 reply, err := f.callAgent(ctx, f.opts.Agent, prompt)
251 result.Duration = time.Since(start).Seconds()
252 if err != nil {
253 result.Error = err.Error()
254 result.ErrorKind = string(ai.ClassifyError(err))
255 f.record(result)
256 return err
257 }
258 result.Reply = reply
259 f.record(result)
260 f.log.Logf(logger.InfoLevel, "Flow %s dispatched to agent %s in %.1fs",
261 f.name, f.opts.Agent, result.Duration)
262 return nil
263 }
264
265 // Otherwise run a single augmented-LLM step with the services as tools.
266 discovered, err := f.toolSet.Discover()
267 if err != nil {
268 result.Duration = time.Since(start).Seconds()
269 result.Error = err.Error()
270 result.ErrorKind = string(ai.ClassifyError(err))
271 f.record(result)
272 return fmt.Errorf("discover tools: %w", err)
273 }

Calls 15

withTimeoutMethod · 0.95
startRunMethod · 0.95
callAgentMethod · 0.95
recordMethod · 0.95
RunInfoFromFunction · 0.92
WithRunInfoFunction · 0.92
ClassifyErrorFunction · 0.92
SinceMethod · 0.80
DiscoverMethod · 0.80
StringMethod · 0.65
ErrorMethod · 0.65
LogfMethod · 0.65