(queuedAt: Date, limit: number)
| 120 | } |
| 121 | |
| 122 | async function claimWorkflowSchedules(queuedAt: Date, limit: number) { |
| 123 | if (limit <= 0) return [] |
| 124 | |
| 125 | return db.transaction(async (tx) => { |
| 126 | const rows = await tx |
| 127 | .select({ |
| 128 | id: workflowSchedule.id, |
| 129 | workspaceId: workflow.workspaceId, |
| 130 | }) |
| 131 | .from(workflowSchedule) |
| 132 | .innerJoin(workflow, eq(workflowSchedule.workflowId, workflow.id)) |
| 133 | .where(workflowScheduleFilter(queuedAt)) |
| 134 | .for('update', { skipLocked: true }) |
| 135 | .limit(limit) |
| 136 | |
| 137 | if (rows.length === 0) return [] |
| 138 | const workspaceIdsByScheduleId = new Map(rows.map((row) => [row.id, row.workspaceId])) |
| 139 | |
| 140 | const claimedRows = await tx |
| 141 | .update(workflowSchedule) |
| 142 | .set({ lastQueuedAt: queuedAt, updatedAt: queuedAt }) |
| 143 | .where( |
| 144 | and( |
| 145 | workflowScheduleFilter(queuedAt), |
| 146 | inArray( |
| 147 | workflowSchedule.id, |
| 148 | rows.map((row) => row.id) |
| 149 | ) |
| 150 | ) |
| 151 | ) |
| 152 | .returning({ |
| 153 | id: workflowSchedule.id, |
| 154 | workflowId: workflowSchedule.workflowId, |
| 155 | blockId: workflowSchedule.blockId, |
| 156 | cronExpression: workflowSchedule.cronExpression, |
| 157 | lastRanAt: workflowSchedule.lastRanAt, |
| 158 | failedCount: workflowSchedule.failedCount, |
| 159 | infraRetryCount: workflowSchedule.infraRetryCount, |
| 160 | nextRunAt: workflowSchedule.nextRunAt, |
| 161 | lastQueuedAt: workflowSchedule.lastQueuedAt, |
| 162 | timezone: workflowSchedule.timezone, |
| 163 | deploymentVersionId: workflowSchedule.deploymentVersionId, |
| 164 | sourceType: workflowSchedule.sourceType, |
| 165 | }) |
| 166 | |
| 167 | return claimedRows.map((row) => ({ |
| 168 | ...row, |
| 169 | workspaceId: workspaceIdsByScheduleId.get(row.id) ?? null, |
| 170 | })) |
| 171 | }) |
| 172 | } |
| 173 | |
| 174 | async function claimJobSchedules(queuedAt: Date, limit: number) { |
| 175 | if (limit <= 0) return [] |
no test coverage detected