| 210 | } |
| 211 | |
| 212 | async function executeStatement(input: ExecuteInput): Promise<void> { |
| 213 | for (const value of input.bindings) { |
| 214 | const bytes = Buffer.byteLength(value, 'utf8') |
| 215 | if (bytes > VARIANT_MAX_BYTES) { |
| 216 | throw new Error( |
| 217 | `Snowflake VARIANT value exceeds 16 MB limit (got ${bytes} bytes); split the row before delivery` |
| 218 | ) |
| 219 | } |
| 220 | } |
| 221 | const baseUrl = `https://${input.config.account}.snowflakecomputing.com/api/v2/statements` |
| 222 | const bindings: Record<string, { type: 'TEXT'; value: string }> = {} |
| 223 | input.bindings.forEach((value, index) => { |
| 224 | bindings[(index + 1).toString()] = { type: 'TEXT', value } |
| 225 | }) |
| 226 | const body = { |
| 227 | statement: input.statement, |
| 228 | timeout: SQL_API_TIMEOUT_SECONDS, |
| 229 | warehouse: input.config.warehouse, |
| 230 | role: input.config.role, |
| 231 | bindings, |
| 232 | } |
| 233 | const serializedBody = JSON.stringify(body) |
| 234 | /** Stable per-request UUID enables idempotent retries via `retry=true` on subsequent attempts. */ |
| 235 | const requestId = generateId() |
| 236 | |
| 237 | let lastError: unknown |
| 238 | for (let attempt = 1; attempt <= EXECUTE_MAX_ATTEMPTS; attempt++) { |
| 239 | if (input.signal.aborted) throw input.signal.reason ?? new Error('Aborted') |
| 240 | /** Acquire JWT before starting the per-attempt timer so token signing doesn't eat the network budget (mirrors pollStatement). */ |
| 241 | const jwt = await input.getJwt() |
| 242 | const perAttempt = AbortSignal.any([input.signal, AbortSignal.timeout(PER_ATTEMPT_TIMEOUT_MS)]) |
| 243 | const params = new URLSearchParams({ requestId }) |
| 244 | if (attempt > 1) params.set('retry', 'true') |
| 245 | const url = `${baseUrl}?${params.toString()}` |
| 246 | let response: Response |
| 247 | try { |
| 248 | response = await fetch(url, { |
| 249 | method: 'POST', |
| 250 | headers: { |
| 251 | Authorization: `Bearer ${jwt}`, |
| 252 | 'Content-Type': 'application/json', |
| 253 | Accept: 'application/json', |
| 254 | 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT', |
| 255 | 'User-Agent': 'sim-data-drain/1.0', |
| 256 | }, |
| 257 | body: serializedBody, |
| 258 | signal: perAttempt, |
| 259 | }) |
| 260 | } catch (error) { |
| 261 | lastError = error |
| 262 | logger.warn('Snowflake request failed', { |
| 263 | attempt, |
| 264 | error: toError(error).message, |
| 265 | }) |
| 266 | if (input.signal.aborted || attempt === EXECUTE_MAX_ATTEMPTS) throw error |
| 267 | await sleepUntilAborted( |
| 268 | backoffWithJitter(attempt, null, { |
| 269 | baseMs: EXECUTE_RETRY_BASE_DELAY_MS, |