* Streams a chunk of rows to `tabledata.insertAll`. * * Partial-success caveat: BigQuery may return HTTP 200 with a non-empty * `insertErrors` array. Rows not listed there are inserted and dedup-keyed by * `insertId` for ~60s. We throw on any `insertErrors`; retries within the * dedup window ar
(input: InsertAllInput)
| 160 | * dedup window are safe, but retries after it may duplicate succeeded rows. |
| 161 | */ |
| 162 | async function insertAll(input: InsertAllInput): Promise<void> { |
| 163 | if (input.rows.length > MAX_ROWS_PER_REQUEST) { |
| 164 | throw new Error( |
| 165 | `BigQuery insertAll chunk has ${input.rows.length} rows, exceeds the ${MAX_ROWS_PER_REQUEST} per-request limit` |
| 166 | ) |
| 167 | } |
| 168 | const url = `https://bigquery.googleapis.com/bigquery/v2/projects/${encodeURIComponent(input.config.projectId)}/datasets/${encodeURIComponent(input.config.datasetId)}/tables/${encodeURIComponent(input.config.tableId)}/insertAll` |
| 169 | /** |
| 170 | * `skipInvalidRows: false` and `ignoreUnknownValues: false` surface schema |
| 171 | * mismatches as `insertErrors` instead of silently dropping data — drains |
| 172 | * should fail loudly so operators notice the schema drift. |
| 173 | */ |
| 174 | const payload = { |
| 175 | skipInvalidRows: false, |
| 176 | ignoreUnknownValues: false, |
| 177 | rows: input.rows.map((row, index) => { |
| 178 | const rowBytes = Buffer.byteLength(JSON.stringify(row), 'utf8') |
| 179 | if (rowBytes > MAX_ROW_BYTES) { |
| 180 | throw new Error( |
| 181 | `BigQuery row at index ${index} is ${rowBytes} bytes, exceeds the ${MAX_ROW_BYTES}-byte per-row limit` |
| 182 | ) |
| 183 | } |
| 184 | return { |
| 185 | insertId: buildInsertId(input.metadata, index), |
| 186 | json: row, |
| 187 | } |
| 188 | }), |
| 189 | } |
| 190 | const body = JSON.stringify(payload) |
| 191 | const byteLength = Buffer.byteLength(body, 'utf8') |
| 192 | if (byteLength > MAX_REQUEST_BYTES) { |
| 193 | throw new Error( |
| 194 | `BigQuery insertAll body is ${byteLength} bytes, exceeds the ${MAX_REQUEST_BYTES}-byte per-request limit` |
| 195 | ) |
| 196 | } |
| 197 | let attempt = 0 |
| 198 | let response: Response | undefined |
| 199 | let refreshedOnce = false |
| 200 | while (true) { |
| 201 | attempt++ |
| 202 | try { |
| 203 | response = await postInsertAll(input, url, body) |
| 204 | /** A 401 retry doesn't count against the 5xx/429 budget — token refresh is a one-shot recovery. */ |
| 205 | if (response.status === 401 && !refreshedOnce) { |
| 206 | refreshedOnce = true |
| 207 | logger.debug('BigQuery returned 401; refreshing access token and retrying once') |
| 208 | /** Drain the 401 body before discarding so undici can return the socket to the keep-alive pool. */ |
| 209 | await response.text().catch(() => '') |
| 210 | response = await postInsertAll(input, url, body, true) |
| 211 | } |
| 212 | if (!RETRYABLE_STATUSES.has(response.status)) break |
| 213 | if (attempt >= MAX_RETRY_ATTEMPTS) break |
| 214 | const retryAfterHeaderMs = parseRetryAfter(response.headers.get('retry-after')) |
| 215 | const retryAfterMs = backoffWithJitter(attempt, retryAfterHeaderMs, { |
| 216 | baseMs: BASE_RETRY_DELAY_MS, |
| 217 | }) |
| 218 | logger.warn('BigQuery insertAll transient error; retrying', { |
| 219 | status: response.status, |
no test coverage detected