(queuedAt: Date, limit: number)
| 172 | } |
| 173 | |
| 174 | async function claimJobSchedules(queuedAt: Date, limit: number) { |
| 175 | if (limit <= 0) return [] |
| 176 | |
| 177 | return db.transaction(async (tx) => { |
| 178 | const rows = await tx |
| 179 | .select({ id: workflowSchedule.id }) |
| 180 | .from(workflowSchedule) |
| 181 | .where(jobScheduleFilter(queuedAt)) |
| 182 | .for('update', { skipLocked: true }) |
| 183 | .limit(limit) |
| 184 | |
| 185 | if (rows.length === 0) return [] |
| 186 | |
| 187 | return tx |
| 188 | .update(workflowSchedule) |
| 189 | .set({ lastQueuedAt: queuedAt, updatedAt: queuedAt }) |
| 190 | .where( |
| 191 | and( |
| 192 | jobScheduleFilter(queuedAt), |
| 193 | inArray( |
| 194 | workflowSchedule.id, |
| 195 | rows.map((row) => row.id) |
| 196 | ) |
| 197 | ) |
| 198 | ) |
| 199 | .returning({ |
| 200 | id: workflowSchedule.id, |
| 201 | cronExpression: workflowSchedule.cronExpression, |
| 202 | timezone: workflowSchedule.timezone, |
| 203 | failedCount: workflowSchedule.failedCount, |
| 204 | lastQueuedAt: workflowSchedule.lastQueuedAt, |
| 205 | sourceType: workflowSchedule.sourceType, |
| 206 | }) |
| 207 | }) |
| 208 | } |
| 209 | |
| 210 | type ClaimedSchedule = Awaited<ReturnType<typeof claimWorkflowSchedules>>[number] |
| 211 | type ClaimedJob = Awaited<ReturnType<typeof claimJobSchedules>>[number] |
no test coverage detected