(jobId: string)
| 523 | } |
| 524 | |
| 525 | async function tryStartDatabaseScheduleJob(jobId: string): Promise<DatabaseScheduleStartResult> { |
| 526 | const now = new Date() |
| 527 | |
| 528 | return db.transaction(async (tx) => { |
| 529 | const [lock] = await tx.execute<{ acquired: boolean }>( |
| 530 | sql`SELECT pg_try_advisory_xact_lock(hashtextextended(${SCHEDULE_EXECUTION_QUEUE_NAME}, 0)) AS acquired` |
| 531 | ) |
| 532 | if (!lock?.acquired) return 'capacity_full' |
| 533 | |
| 534 | const [row] = await tx |
| 535 | .select({ |
| 536 | count: sql<number>`count(*)`, |
| 537 | }) |
| 538 | .from(asyncJobs) |
| 539 | .where(activeScheduleExecutionJobsFilter()) |
| 540 | |
| 541 | if (Number(row?.count ?? 0) >= SCHEDULE_EXECUTION_CONCURRENCY_LIMIT) { |
| 542 | return 'capacity_full' |
| 543 | } |
| 544 | |
| 545 | const [startedJob] = await tx |
| 546 | .update(asyncJobs) |
| 547 | .set({ |
| 548 | status: JOB_STATUS.PROCESSING, |
| 549 | startedAt: now, |
| 550 | attempts: sql`${asyncJobs.attempts} + 1`, |
| 551 | updatedAt: now, |
| 552 | }) |
| 553 | .where(and(eq(asyncJobs.id, jobId), eq(asyncJobs.status, JOB_STATUS.PENDING))) |
| 554 | .returning({ id: asyncJobs.id }) |
| 555 | |
| 556 | return startedJob ? 'started' : 'not_pending' |
| 557 | }) |
| 558 | } |
| 559 | |
| 560 | async function executeDatabaseScheduleJob( |
| 561 | jobQueue: JobQueue, |
no test coverage detected