MCPcopy Index your code
hub / github.com/PatchMon/PatchMon / ProcessTask

Method ProcessTask

server-source-code/internal/queue/jobs.go:353–402  ·  view source on GitHub ↗

ProcessTask implements asynq.Handler.

(ctx context.Context, t *asynq.Task)

Source from the content-addressed store, hash-verified

351
352// ProcessTask implements asynq.Handler.
353func (h *ReportNowHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
354 var p ReportNowPayload
355 if err := json.Unmarshal(t.Payload(), &p); err != nil {
356 return err
357 }
358
359 taskID, _ := asynq.GetTaskID(ctx)
360 retryCount, _ := asynq.GetRetryCount(ctx)
361 attempt := int32(retryCount + 1)
362
363 // Log to job_history on first attempt so it persists in Agent Queue tab (like BullMQ)
364 if h.db != nil && taskID != "" && retryCount == 0 {
365 host, err := h.db.Queries.GetHostByApiID(ctx, p.ApiID)
366 var hostID *string
367 if err == nil {
368 hostID = &host.ID
369 }
370 apiIDPtr := &p.ApiID
371 _ = h.db.Queries.InsertJobHistory(ctx, db.InsertJobHistoryParams{
372 ID: uuid.New().String(),
373 JobID: taskID,
374 QueueName: QueueAgentCommands,
375 JobName: TypeReportNow,
376 HostID: hostID,
377 ApiID: apiIDPtr,
378 Status: "active",
379 AttemptNumber: attempt,
380 })
381 }
382
383 if !h.registry.IsConnected(p.ApiID) {
384 h.log.Warn("report_now: agent not connected", "api_id", p.ApiID)
385 if taskID != "" && h.db != nil {
386 msg := "Agent not connected"
387 _ = h.db.Queries.UpdateJobHistoryFailed(ctx, db.UpdateJobHistoryFailedParams{JobID: taskID, ErrorMessage: &msg})
388 }
389 return nil // Don't retry - agent may connect later, user can retry
390 }
391 msg := []byte(`{"type":"report_now"}`)
392 if err := h.registry.SendMessage(p.ApiID, websocket.TextMessage, msg); err != nil {
393 h.log.Warn("report_now: write failed", "api_id", p.ApiID, "error", err)
394 return err // Retry on write failure - don't update job_history yet
395 }
396
397 if taskID != "" && h.db != nil {
398 _ = h.db.Queries.UpdateJobHistoryCompleted(ctx, taskID)
399 }
400 h.log.Info("report_now sent", "api_id", p.ApiID)
401 return nil
402}
403
404// sendAgentCommand is a helper that sends a JSON command to the agent and updates job_history.
405func sendAgentCommand(ctx context.Context, h *ReportNowHandler, p ReportNowPayload, msgType, taskID string, retryCount int) error {

Callers

nothing calls this directly

Calls 7

UnmarshalMethod · 0.80
IsConnectedMethod · 0.80
SendMessageMethod · 0.80
GetHostByApiIDMethod · 0.65
InsertJobHistoryMethod · 0.65

Tested by

no test coverage detected