MCPcopy
hub / github.com/google/ax / Exec

Method Exec

internal/server/server.go:57–74  ·  view source on GitHub ↗

Exec executes a new agentic task with streaming responses.

(req *proto.ExecRequest, stream grpc.ServerStreamingServer[proto.ExecResponse])

Source from the content-addressed store, hash-verified

55
56// Exec executes a new agentic task with streaming responses.
57func (s *Server) Exec(req *proto.ExecRequest, stream grpc.ServerStreamingServer[proto.ExecResponse]) error {
58 ctx := stream.Context()
59 slog.InfoContext(ctx, "Executing...",
60 slog.String("conversation_id", req.ConversationId))
61
62 inFlight, cleanup := s.markInFlight(req.ConversationId)
63 if inFlight {
64 return status.Errorf(codes.FailedPrecondition, "conversation %q is already in flight", req.ConversationId)
65 }
66 defer cleanup()
67
68 outputHandler := controller.ExecHandler(func(resp *proto.ExecResponse) error {
69 return stream.Send(resp)
70 })
71 err := s.controller.Exec(ctx, req, outputHandler)
72 go suspendActor(req.ConversationId) // TODO(jbd): Move to an interceptor.
73 return err
74}
75
76func (s *Server) ForkConversation(ctx context.Context, req *proto.ForkConversationRequest) (*proto.ForkConversationResponse, error) {
77 slog.InfoContext(ctx, "Forking conversation...",

Callers

nothing calls this directly

Calls 5

markInFlightMethod · 0.95
ExecHandlerFuncType · 0.92
suspendActorFunction · 0.70
ExecMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected