MCPcopy
hub / github.com/mudler/LocalAI / handleMCPCIJob

Function handleMCPCIJob

core/cli/agent_worker.go:368–522  ·  view source on GitHub ↗

handleMCPCIJob processes an MCP CI job on the agent worker. The agent worker can create MCP sessions (has docker) and call the LocalAI API for inference.

(shutdownCtx context.Context, data []byte, apiURL, apiToken string, natsClient messaging.MessagingClient, jobTimeout time.Duration)

Source from the content-addressed store, hash-verified

366// handleMCPCIJob processes an MCP CI job on the agent worker.
367// The agent worker can create MCP sessions (has docker) and call the LocalAI API for inference.
368func handleMCPCIJob(shutdownCtx context.Context, data []byte, apiURL, apiToken string, natsClient messaging.MessagingClient, jobTimeout time.Duration) {
369 var evt jobs.JobEvent
370 if err := json.Unmarshal(data, &evt); err != nil {
371 xlog.Error("Failed to unmarshal job event", "error", err)
372 return
373 }
374
375 job := evt.Job
376 task := evt.Task
377 if job == nil || task == nil {
378 xlog.Error("MCP CI job missing enriched data", "jobID", evt.JobID)
379 publishJobResult(natsClient, evt.JobID, "failed", "", "job or task data missing from NATS event")
380 return
381 }
382
383 modelCfg := evt.ModelConfig
384 if modelCfg == nil {
385 publishJobResult(natsClient, evt.JobID, "failed", "", "model config missing from job event")
386 return
387 }
388
389 xlog.Info("Processing MCP CI job", "jobID", evt.JobID, "taskID", evt.TaskID, "model", task.Model)
390
391 // Publish running status
392 natsClient.Publish(messaging.SubjectJobProgress(evt.JobID), jobs.ProgressEvent{
393 JobID: evt.JobID, Status: "running", Message: "Job started on agent worker",
394 })
395
396 // Parse MCP config
397 if modelCfg.MCP.Servers == "" && modelCfg.MCP.Stdio == "" {
398 publishJobResult(natsClient, evt.JobID, "failed", "", "no MCP servers configured for model")
399 return
400 }
401
402 remote, stdio, err := modelCfg.MCP.MCPConfigFromYAML()
403 if err != nil {
404 publishJobResult(natsClient, evt.JobID, "failed", "", fmt.Sprintf("failed to parse MCP config: %v", err))
405 return
406 }
407
408 // Create MCP sessions locally (agent worker has docker)
409 sessions, err := mcpTools.SessionsFromMCPConfig(modelCfg.Name, remote, stdio)
410 if err != nil || len(sessions) == 0 {
411 errMsg := "no working MCP servers found"
412 if err != nil {
413 errMsg = fmt.Sprintf("failed to create MCP sessions: %v", err)
414 }
415 publishJobResult(natsClient, evt.JobID, "failed", "", errMsg)
416 return
417 }
418
419 // Build prompt from template
420 prompt := task.Prompt
421 if task.CronParametersJSON != "" {
422 var params map[string]string
423 if err := json.Unmarshal([]byte(task.CronParametersJSON), &params); err != nil {
424 xlog.Warn("Failed to unmarshal parameters", "error", err)
425 }

Callers 1

RunMethod · 0.85

Calls 10

SubjectJobProgressFunction · 0.92
publishJobResultFunction · 0.85
publishJobStatusFunction · 0.85
MCPConfigFromYAMLMethod · 0.80
BuildCogitoOptionsMethod · 0.80
PublishMethod · 0.65
StringMethod · 0.65
ErrorMethod · 0.45
LenMethod · 0.45
ResetMethod · 0.45

Tested by

no test coverage detected