( jobQueue: JobQueue, jobId: string, payload: ScheduleExecutionPayload, schedule: DatabaseScheduleExecutionTarget, queuedAt: Date, requestId: string, delayMs: number )
| 558 | } |
| 559 | |
| 560 | async function executeDatabaseScheduleJob( |
| 561 | jobQueue: JobQueue, |
| 562 | jobId: string, |
| 563 | payload: ScheduleExecutionPayload, |
| 564 | schedule: DatabaseScheduleExecutionTarget, |
| 565 | queuedAt: Date, |
| 566 | requestId: string, |
| 567 | delayMs: number |
| 568 | ): Promise<void> { |
| 569 | if (delayMs > 0) await sleep(delayMs) |
| 570 | |
| 571 | const startResult = await runWithDatabaseScheduleStartTurn(() => |
| 572 | tryStartDatabaseScheduleJob(jobId) |
| 573 | ) |
| 574 | if (startResult === 'not_pending') { |
| 575 | logger.info(`[${requestId}] Database schedule execution job is no longer pending`, { |
| 576 | scheduleId: schedule.id, |
| 577 | workflowId: schedule.workflowId, |
| 578 | jobId, |
| 579 | }) |
| 580 | return |
| 581 | } |
| 582 | |
| 583 | if (startResult === 'capacity_full') { |
| 584 | logger.info(`[${requestId}] Deferred database schedule execution because capacity is full`, { |
| 585 | scheduleId: schedule.id, |
| 586 | workflowId: schedule.workflowId, |
| 587 | jobId, |
| 588 | concurrencyLimit: SCHEDULE_EXECUTION_CONCURRENCY_LIMIT, |
| 589 | }) |
| 590 | return |
| 591 | } |
| 592 | |
| 593 | try { |
| 594 | const output = await executeScheduleJob(payload) |
| 595 | await jobQueue.completeJob(jobId, output ?? null) |
| 596 | } catch (error) { |
| 597 | const errorMessage = toError(error).message |
| 598 | logger.error(`[${requestId}] Schedule execution failed for workflow ${schedule.workflowId}`, { |
| 599 | scheduleId: schedule.id, |
| 600 | jobId, |
| 601 | error: errorMessage, |
| 602 | }) |
| 603 | await jobQueue.markJobFailed(jobId, errorMessage) |
| 604 | await releaseScheduleLock( |
| 605 | schedule.id, |
| 606 | requestId, |
| 607 | new Date(), |
| 608 | `Failed to release lock for schedule ${schedule.id} after inline execution failure`, |
| 609 | undefined, |
| 610 | { expectedLastQueuedAt: queuedAt } |
| 611 | ) |
| 612 | } |
| 613 | } |
| 614 | |
| 615 | async function getPendingDatabaseScheduleJobs(limit: number) { |
| 616 | if (limit <= 0) return [] |
no test coverage detected