ProcessTask implements asynq.Handler.
(ctx context.Context, t *asynq.Task)
| 596 | |
| 597 | // ProcessTask implements asynq.Handler. |
| 598 | func (h *RunPatchHandler) ProcessTask(ctx context.Context, t *asynq.Task) error { |
| 599 | var p RunPatchPayload |
| 600 | if err := json.Unmarshal(t.Payload(), &p); err != nil { |
| 601 | return err |
| 602 | } |
| 603 | |
| 604 | // Resolve per-context DB when Host is in payload (multi-host mode). |
| 605 | if h.poolCache != nil && strings.TrimSpace(p.Host) != "" { |
| 606 | if db, err := h.poolCache.GetOrCreate(ctx, p.Host); err == nil && db != nil { |
| 607 | ctx = hostctx.WithDB(ctx, db) |
| 608 | } |
| 609 | } |
| 610 | |
| 611 | if !h.registry.IsConnected(p.ApiID) { |
| 612 | // Check if the patch run still exists in the DB (user may have deleted it). |
| 613 | run, runErr := h.patchRuns.GetByID(ctx, p.PatchRunID) |
| 614 | if runErr != nil || run == nil { |
| 615 | h.log.Info("run_patch: patch run deleted or not found, dropping task", "api_id", p.ApiID, "patch_run_id", p.PatchRunID) |
| 616 | return nil |
| 617 | } |
| 618 | |
| 619 | // Keep the correct status: pending_validation for dry runs, queued for real runs. |
| 620 | status := "queued" |
| 621 | if p.DryRun { |
| 622 | status = "pending_validation" |
| 623 | } |
| 624 | |
| 625 | // If status has changed (e.g. cancelled, completed), don't re-queue. |
| 626 | if run.Status != status && run.Status != "queued" && run.Status != "pending_validation" { |
| 627 | h.log.Info("run_patch: run status changed, dropping task", "api_id", p.ApiID, "patch_run_id", p.PatchRunID, "status", run.Status) |
| 628 | return nil |
| 629 | } |
| 630 | |
| 631 | _ = h.patchRuns.UpdateStatus(ctx, p.PatchRunID, status) |
| 632 | |
| 633 | // Use a deterministic retry task ID so only one retry can exist at a time |
| 634 | // and the delete handler can cancel it. |
| 635 | retryTaskID := "patch-run-" + p.PatchRunID + "-retry" |
| 636 | task, err := NewRunPatchRetryTask(p, retryTaskID) |
| 637 | if err != nil { |
| 638 | return err |
| 639 | } |
| 640 | _, err = h.queueClient.Enqueue(task, asynq.ProcessIn(5*time.Minute)) |
| 641 | if err != nil { |
| 642 | // If a task with this ID already exists (pending), that's fine - skip. |
| 643 | h.log.Debug("run_patch: re-enqueue skipped or failed", "api_id", p.ApiID, "error", err) |
| 644 | } else { |
| 645 | h.log.Info("run_patch: agent offline, re-queued in 5m", "api_id", p.ApiID, "patch_run_id", p.PatchRunID) |
| 646 | } |
| 647 | return nil |
| 648 | } |
| 649 | |
| 650 | // Build run_patch payload |
| 651 | payload := map[string]interface{}{ |
| 652 | "type": "run_patch", |
| 653 | "patch_run_id": p.PatchRunID, |
| 654 | "patch_type": p.PatchType, |
| 655 | "dry_run": p.DryRun, |
nothing calls this directly
no test coverage detected