(payload: ScheduleExecutionPayload)
| 603 | } |
| 604 | |
| 605 | export async function executeScheduleJob(payload: ScheduleExecutionPayload) { |
| 606 | const correlation = buildScheduleCorrelation(payload) |
| 607 | const executionId = correlation.executionId |
| 608 | const requestId = correlation.requestId |
| 609 | const claimedAt = getScheduleClaimedAt(payload) |
| 610 | const now = new Date() |
| 611 | const scheduledFor = payload.scheduledFor ? new Date(payload.scheduledFor) : null |
| 612 | |
| 613 | return runWithRequestContext({ requestId }, async () => { |
| 614 | logger.info(`[${requestId}] Starting schedule execution`, { |
| 615 | scheduleId: payload.scheduleId, |
| 616 | workflowId: payload.workflowId, |
| 617 | executionId, |
| 618 | scheduledFor: scheduledFor?.toISOString(), |
| 619 | claimedAt: claimedAt?.toISOString(), |
| 620 | }) |
| 621 | |
| 622 | const releaseClaim = ( |
| 623 | releaseNow: Date, |
| 624 | context: string, |
| 625 | nextRunAt?: Date | null |
| 626 | ): Promise<boolean> => |
| 627 | releaseScheduleLock(payload.scheduleId, requestId, releaseNow, context, nextRunAt, { |
| 628 | expectedLastQueuedAt: claimedAt, |
| 629 | }) |
| 630 | |
| 631 | const updateClaimedSchedule = ( |
| 632 | updates: WorkflowScheduleUpdate, |
| 633 | context: string |
| 634 | ): Promise<boolean> => |
| 635 | applyScheduleUpdate(payload.scheduleId, updates, requestId, context, { |
| 636 | expectedLastQueuedAt: claimedAt, |
| 637 | }) |
| 638 | |
| 639 | try { |
| 640 | const [scheduleRecord] = await db |
| 641 | .select({ |
| 642 | id: workflowSchedule.id, |
| 643 | workflowId: workflowSchedule.workflowId, |
| 644 | deploymentVersionId: workflowSchedule.deploymentVersionId, |
| 645 | status: workflowSchedule.status, |
| 646 | archivedAt: workflowSchedule.archivedAt, |
| 647 | lastQueuedAt: workflowSchedule.lastQueuedAt, |
| 648 | }) |
| 649 | .from(workflowSchedule) |
| 650 | .where(eq(workflowSchedule.id, payload.scheduleId)) |
| 651 | .limit(1) |
| 652 | |
| 653 | if (!scheduleRecord) { |
| 654 | logger.info(`[${requestId}] Schedule no longer exists, skipping execution`, { |
| 655 | scheduleId: payload.scheduleId, |
| 656 | }) |
| 657 | return |
| 658 | } |
| 659 | |
| 660 | if ( |
| 661 | claimedAt && |
| 662 | (!scheduleRecord.lastQueuedAt || |
no test coverage detected