MCPcopy Index your code
hub / github.com/simstudioai/sim / processScheduleItem

Function processScheduleItem

apps/sim/app/api/schedules/execute/route.ts:733–1031  ·  view source on GitHub ↗
(
  schedule: ClaimedSchedule,
  queuedAt: Date,
  requestId: string,
  jobQueue: JobQueue,
  useDatabaseFallback: boolean
)

Source from the content-addressed store, hash-verified

731}
732
733async function processScheduleItem(
734 schedule: ClaimedSchedule,
735 queuedAt: Date,
736 requestId: string,
737 jobQueue: JobQueue,
738 useDatabaseFallback: boolean
739) {
740 const queueTime = schedule.lastQueuedAt ?? queuedAt
741 const executionId = generateId()
742 const correlation = {
743 executionId,
744 requestId,
745 source: 'schedule' as const,
746 workflowId: schedule.workflowId!,
747 scheduleId: schedule.id,
748 triggerType: 'schedule',
749 scheduledFor: schedule.nextRunAt?.toISOString(),
750 }
751
752 const payload = {
753 scheduleId: schedule.id,
754 workflowId: schedule.workflowId!,
755 executionId,
756 requestId,
757 correlation,
758 blockId: schedule.blockId || undefined,
759 workspaceId: schedule.workspaceId || undefined,
760 deploymentVersionId: schedule.deploymentVersionId || undefined,
761 cronExpression: schedule.cronExpression || undefined,
762 timezone: schedule.timezone || undefined,
763 lastRanAt: schedule.lastRanAt?.toISOString(),
764 failedCount: schedule.failedCount || 0,
765 infraRetryCount: schedule.infraRetryCount || 0,
766 now: queueTime.toISOString(),
767 scheduledFor: schedule.nextRunAt?.toISOString(),
768 }
769
770 let enqueuedJobId: string | null = null
771
772 try {
773 const delayMs = Math.floor(Math.random() * SCHEDULE_JITTER_MAX_MS)
774
775 const scheduleJobId = buildScheduleExecutionJobId(schedule)
776 const existingJob = await jobQueue.getJob(scheduleJobId)
777 if (existingJob && ['pending', 'processing'].includes(existingJob.status)) {
778 const activeJobPayload = getSchedulePayloadFromJob(existingJob)
779 const activeJobClaim = getSchedulePayloadClaimedAt(activeJobPayload)
780
781 if (useDatabaseFallback && isStaleDatabaseScheduleJob(existingJob)) {
782 await recoverStaleDatabaseScheduleJobs(new Date())
783 logger.info(`[${requestId}] Recovered stale database schedule execution jobs`, {
784 scheduleId: schedule.id,
785 jobId: scheduleJobId,
786 })
787 }
788
789 const databaseJob = useDatabaseFallback ? await jobQueue.getJob(scheduleJobId) : existingJob
790 const databaseJobPayload = databaseJob ? getSchedulePayloadFromJob(databaseJob) : null

Callers 1

runScheduleTickFunction · 0.85

Calls 15

generateIdFunction · 0.90
releaseScheduleLockFunction · 0.90
isStaleScheduleClaimFunction · 0.85
restoreScheduleClaimFunction · 0.85

Tested by

no test coverage detected