sendAgentCommand is a helper that sends a JSON command to the agent and updates job_history.
(ctx context.Context, h *ReportNowHandler, p ReportNowPayload, msgType, taskID string, retryCount int)
| 403 | |
| 404 | // sendAgentCommand is a helper that sends a JSON command to the agent and updates job_history. |
| 405 | func sendAgentCommand(ctx context.Context, h *ReportNowHandler, p ReportNowPayload, msgType, taskID string, retryCount int) error { |
| 406 | taskIDVal, _ := asynq.GetTaskID(ctx) |
| 407 | if taskID == "" { |
| 408 | taskID = taskIDVal |
| 409 | } |
| 410 | attempt := int32(retryCount + 1) |
| 411 | |
| 412 | if h.db != nil && taskID != "" && retryCount == 0 { |
| 413 | host, err := h.db.Queries.GetHostByApiID(ctx, p.ApiID) |
| 414 | var hostID *string |
| 415 | if err == nil { |
| 416 | hostID = &host.ID |
| 417 | } |
| 418 | apiIDPtr := &p.ApiID |
| 419 | _ = h.db.Queries.InsertJobHistory(ctx, db.InsertJobHistoryParams{ |
| 420 | ID: uuid.New().String(), |
| 421 | JobID: taskID, |
| 422 | QueueName: QueueAgentCommands, |
| 423 | JobName: msgType, |
| 424 | HostID: hostID, |
| 425 | ApiID: apiIDPtr, |
| 426 | Status: "active", |
| 427 | AttemptNumber: attempt, |
| 428 | }) |
| 429 | } |
| 430 | |
| 431 | if !h.registry.IsConnected(p.ApiID) { |
| 432 | h.log.Warn(msgType+": agent not connected", "api_id", p.ApiID) |
| 433 | if taskID != "" && h.db != nil { |
| 434 | msg := "Agent not connected" |
| 435 | _ = h.db.Queries.UpdateJobHistoryFailed(ctx, db.UpdateJobHistoryFailedParams{JobID: taskID, ErrorMessage: &msg}) |
| 436 | } |
| 437 | return nil |
| 438 | } |
| 439 | msg := []byte(`{"type":"` + msgType + `"}`) |
| 440 | if err := h.registry.SendMessage(p.ApiID, websocket.TextMessage, msg); err != nil { |
| 441 | h.log.Warn(msgType+": write failed", "api_id", p.ApiID, "error", err) |
| 442 | return err |
| 443 | } |
| 444 | |
| 445 | if taskID != "" && h.db != nil { |
| 446 | _ = h.db.Queries.UpdateJobHistoryCompleted(ctx, taskID) |
| 447 | } |
| 448 | h.log.Info(msgType+" sent", "api_id", p.ApiID) |
| 449 | return nil |
| 450 | } |
| 451 | |
| 452 | // RefreshIntegrationStatusHandler handles refresh_integration_status jobs. |
| 453 | type RefreshIntegrationStatusHandler struct { |
no test coverage detected