( options: PreprocessExecutionOptions )
| 100 | type SubscriptionInfo = HighestPrioritySubscription |
| 101 | |
| 102 | export async function preprocessExecution( |
| 103 | options: PreprocessExecutionOptions |
| 104 | ): Promise<PreprocessExecutionResult> { |
| 105 | const { |
| 106 | workflowId, |
| 107 | userId, |
| 108 | triggerType, |
| 109 | executionId, |
| 110 | requestId, |
| 111 | checkRateLimit = triggerType !== 'manual' && triggerType !== 'chat', |
| 112 | checkDeployment = triggerType !== 'manual', |
| 113 | skipUsageLimits = false, |
| 114 | skipConcurrencyReservation = false, |
| 115 | logPreprocessingErrors = true, |
| 116 | workspaceId: providedWorkspaceId, |
| 117 | loggingSession: providedLoggingSession, |
| 118 | triggerData, |
| 119 | isResumeContext: _isResumeContext = false, |
| 120 | useAuthenticatedUserAsActor = false, |
| 121 | workflowRecord: prefetchedWorkflowRecord, |
| 122 | resolvedActorUserId, |
| 123 | } = options |
| 124 | |
| 125 | // When `logPreprocessingErrors` is false the caller surfaces failures itself |
| 126 | // (e.g. table cells use cell state / SSE), so skip the execution-log writes. |
| 127 | const recordPreprocessingError: typeof logPreprocessingError = (args) => |
| 128 | logPreprocessingErrors ? logPreprocessingError(args) : Promise.resolve() |
| 129 | |
| 130 | logger.info(`[${requestId}] Starting execution preprocessing`, { |
| 131 | workflowId, |
| 132 | userId, |
| 133 | triggerType, |
| 134 | executionId, |
| 135 | }) |
| 136 | |
| 137 | // ========== STEP 1: Validate Workflow Exists ========== |
| 138 | if (prefetchedWorkflowRecord && prefetchedWorkflowRecord.id !== workflowId) { |
| 139 | logger.error(`[${requestId}] Prefetched workflow record ID mismatch`, { |
| 140 | expected: workflowId, |
| 141 | received: prefetchedWorkflowRecord.id, |
| 142 | }) |
| 143 | throw new Error( |
| 144 | `Prefetched workflow record ID mismatch: expected ${workflowId}, got ${prefetchedWorkflowRecord.id}` |
| 145 | ) |
| 146 | } |
| 147 | let workflowRecord: WorkflowRecord | null = prefetchedWorkflowRecord ?? null |
| 148 | if (!workflowRecord) { |
| 149 | try { |
| 150 | workflowRecord = await getActiveWorkflowRecord(workflowId) |
| 151 | |
| 152 | if (!workflowRecord) { |
| 153 | logger.warn(`[${requestId}] Workflow not found: ${workflowId}`) |
| 154 | |
| 155 | await recordPreprocessingError({ |
| 156 | workflowId, |
| 157 | executionId, |
| 158 | triggerType, |
| 159 | requestId, |
no test coverage detected