(requestId: string)
| 1069 | * the `FOR UPDATE SKIP LOCKED` claim layer, not this function. |
| 1070 | */ |
| 1071 | export async function runScheduleTick(requestId: string): Promise<ScheduleTickResult> { |
| 1072 | const tickStart = Date.now() |
| 1073 | |
| 1074 | const jobQueue = await getJobQueue() |
| 1075 | const useDatabaseFallback = shouldExecuteInline() |
| 1076 | let totalSchedules = 0 |
| 1077 | let totalJobs = 0 |
| 1078 | let iterations = 0 |
| 1079 | let remainingWorkflowBudget = SCHEDULE_WORKFLOW_ENQUEUE_LIMIT |
| 1080 | let schedulesExhausted = false |
| 1081 | let jobsExhausted = false |
| 1082 | |
| 1083 | while (Date.now() - tickStart < MAX_TICK_DURATION_MS) { |
| 1084 | if (schedulesExhausted && jobsExhausted) break |
| 1085 | const queuedAt = new Date() |
| 1086 | let resumedPendingSchedules = 0 |
| 1087 | let databaseScheduleSlots = SCHEDULE_EXECUTION_CONCURRENCY_LIMIT |
| 1088 | |
| 1089 | if (useDatabaseFallback) { |
| 1090 | await recoverStaleDatabaseScheduleJobs(queuedAt) |
| 1091 | databaseScheduleSlots = await getDatabaseScheduleExecutionSlots() |
| 1092 | resumedPendingSchedules = await resumePendingDatabaseScheduleJobs( |
| 1093 | jobQueue, |
| 1094 | requestId, |
| 1095 | databaseScheduleSlots |
| 1096 | ) |
| 1097 | databaseScheduleSlots = await getDatabaseScheduleExecutionSlots() |
| 1098 | } |
| 1099 | |
| 1100 | const workflowClaimLimit = Math.min( |
| 1101 | WORKFLOW_CHUNK_SIZE, |
| 1102 | remainingWorkflowBudget, |
| 1103 | useDatabaseFallback ? databaseScheduleSlots : WORKFLOW_CHUNK_SIZE |
| 1104 | ) |
| 1105 | |
| 1106 | if (useDatabaseFallback && workflowClaimLimit <= 0) { |
| 1107 | schedulesExhausted = true |
| 1108 | } |
| 1109 | |
| 1110 | const [dueSchedules, dueJobs] = await Promise.all([ |
| 1111 | schedulesExhausted ? [] : claimWorkflowSchedules(queuedAt, workflowClaimLimit), |
| 1112 | jobsExhausted ? [] : claimJobSchedules(queuedAt, JOB_CHUNK_SIZE), |
| 1113 | ]) |
| 1114 | |
| 1115 | remainingWorkflowBudget -= dueSchedules.length |
| 1116 | if (dueSchedules.length < workflowClaimLimit || remainingWorkflowBudget <= 0) { |
| 1117 | schedulesExhausted = true |
| 1118 | } |
| 1119 | if (dueJobs.length < JOB_CHUNK_SIZE) jobsExhausted = true |
| 1120 | |
| 1121 | if (dueSchedules.length === 0 && dueJobs.length === 0 && resumedPendingSchedules === 0) break |
| 1122 | |
| 1123 | iterations += 1 |
| 1124 | totalSchedules += dueSchedules.length + resumedPendingSchedules |
| 1125 | totalJobs += dueJobs.length |
| 1126 | |
| 1127 | logger.info( |
| 1128 | `[${requestId}] Iteration ${iterations}: claimed ${dueSchedules.length} schedules, resumed ${resumedPendingSchedules} pending schedule jobs, ${dueJobs.length} jobs`, |
no test coverage detected