( schedule: ClaimedSchedule, queuedAt: Date, requestId: string, jobQueue: JobQueue, useDatabaseFallback: boolean )
| 731 | } |
| 732 | |
| 733 | async function processScheduleItem( |
| 734 | schedule: ClaimedSchedule, |
| 735 | queuedAt: Date, |
| 736 | requestId: string, |
| 737 | jobQueue: JobQueue, |
| 738 | useDatabaseFallback: boolean |
| 739 | ) { |
| 740 | const queueTime = schedule.lastQueuedAt ?? queuedAt |
| 741 | const executionId = generateId() |
| 742 | const correlation = { |
| 743 | executionId, |
| 744 | requestId, |
| 745 | source: 'schedule' as const, |
| 746 | workflowId: schedule.workflowId!, |
| 747 | scheduleId: schedule.id, |
| 748 | triggerType: 'schedule', |
| 749 | scheduledFor: schedule.nextRunAt?.toISOString(), |
| 750 | } |
| 751 | |
| 752 | const payload = { |
| 753 | scheduleId: schedule.id, |
| 754 | workflowId: schedule.workflowId!, |
| 755 | executionId, |
| 756 | requestId, |
| 757 | correlation, |
| 758 | blockId: schedule.blockId || undefined, |
| 759 | workspaceId: schedule.workspaceId || undefined, |
| 760 | deploymentVersionId: schedule.deploymentVersionId || undefined, |
| 761 | cronExpression: schedule.cronExpression || undefined, |
| 762 | timezone: schedule.timezone || undefined, |
| 763 | lastRanAt: schedule.lastRanAt?.toISOString(), |
| 764 | failedCount: schedule.failedCount || 0, |
| 765 | infraRetryCount: schedule.infraRetryCount || 0, |
| 766 | now: queueTime.toISOString(), |
| 767 | scheduledFor: schedule.nextRunAt?.toISOString(), |
| 768 | } |
| 769 | |
| 770 | let enqueuedJobId: string | null = null |
| 771 | |
| 772 | try { |
| 773 | const delayMs = Math.floor(Math.random() * SCHEDULE_JITTER_MAX_MS) |
| 774 | |
| 775 | const scheduleJobId = buildScheduleExecutionJobId(schedule) |
| 776 | const existingJob = await jobQueue.getJob(scheduleJobId) |
| 777 | if (existingJob && ['pending', 'processing'].includes(existingJob.status)) { |
| 778 | const activeJobPayload = getSchedulePayloadFromJob(existingJob) |
| 779 | const activeJobClaim = getSchedulePayloadClaimedAt(activeJobPayload) |
| 780 | |
| 781 | if (useDatabaseFallback && isStaleDatabaseScheduleJob(existingJob)) { |
| 782 | await recoverStaleDatabaseScheduleJobs(new Date()) |
| 783 | logger.info(`[${requestId}] Recovered stale database schedule execution jobs`, { |
| 784 | scheduleId: schedule.id, |
| 785 | jobId: scheduleJobId, |
| 786 | }) |
| 787 | } |
| 788 | |
| 789 | const databaseJob = useDatabaseFallback ? await jobQueue.getJob(scheduleJobId) : existingJob |
| 790 | const databaseJobPayload = databaseJob ? getSchedulePayloadFromJob(databaseJob) : null |
no test coverage detected