MCPcopy Index your code
hub / github.com/PatchMon/PatchMon / ProcessTask

Method ProcessTask

server-source-code/internal/queue/jobs.go:598–675  ·  view source on GitHub ↗

ProcessTask implements asynq.Handler.

(ctx context.Context, t *asynq.Task)

Source from the content-addressed store, hash-verified

596
597// ProcessTask implements asynq.Handler.
598func (h *RunPatchHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
599 var p RunPatchPayload
600 if err := json.Unmarshal(t.Payload(), &p); err != nil {
601 return err
602 }
603
604 // Resolve per-context DB when Host is in payload (multi-host mode).
605 if h.poolCache != nil && strings.TrimSpace(p.Host) != "" {
606 if db, err := h.poolCache.GetOrCreate(ctx, p.Host); err == nil && db != nil {
607 ctx = hostctx.WithDB(ctx, db)
608 }
609 }
610
611 if !h.registry.IsConnected(p.ApiID) {
612 // Check if the patch run still exists in the DB (user may have deleted it).
613 run, runErr := h.patchRuns.GetByID(ctx, p.PatchRunID)
614 if runErr != nil || run == nil {
615 h.log.Info("run_patch: patch run deleted or not found, dropping task", "api_id", p.ApiID, "patch_run_id", p.PatchRunID)
616 return nil
617 }
618
619 // Keep the correct status: pending_validation for dry runs, queued for real runs.
620 status := "queued"
621 if p.DryRun {
622 status = "pending_validation"
623 }
624
625 // If status has changed (e.g. cancelled, completed), don't re-queue.
626 if run.Status != status && run.Status != "queued" && run.Status != "pending_validation" {
627 h.log.Info("run_patch: run status changed, dropping task", "api_id", p.ApiID, "patch_run_id", p.PatchRunID, "status", run.Status)
628 return nil
629 }
630
631 _ = h.patchRuns.UpdateStatus(ctx, p.PatchRunID, status)
632
633 // Use a deterministic retry task ID so only one retry can exist at a time
634 // and the delete handler can cancel it.
635 retryTaskID := "patch-run-" + p.PatchRunID + "-retry"
636 task, err := NewRunPatchRetryTask(p, retryTaskID)
637 if err != nil {
638 return err
639 }
640 _, err = h.queueClient.Enqueue(task, asynq.ProcessIn(5*time.Minute))
641 if err != nil {
642 // If a task with this ID already exists (pending), that's fine - skip.
643 h.log.Debug("run_patch: re-enqueue skipped or failed", "api_id", p.ApiID, "error", err)
644 } else {
645 h.log.Info("run_patch: agent offline, re-queued in 5m", "api_id", p.ApiID, "patch_run_id", p.PatchRunID)
646 }
647 return nil
648 }
649
650 // Build run_patch payload
651 payload := map[string]interface{}{
652 "type": "run_patch",
653 "patch_run_id": p.PatchRunID,
654 "patch_type": p.PatchType,
655 "dry_run": p.DryRun,

Callers

nothing calls this directly

Calls 7

NewRunPatchRetryTaskFunction · 0.85
UnmarshalMethod · 0.80
IsConnectedMethod · 0.80
UpdateStatusMethod · 0.80
SendMessageMethod · 0.80
GetOrCreateMethod · 0.45
GetByIDMethod · 0.45

Tested by

no test coverage detected