MCPcopy Index your code
hub / github.com/simstudioai/sim / insertAll

Function insertAll

apps/sim/lib/data-drains/destinations/bigquery.ts:162–276  ·  view source on GitHub ↗

* Streams a chunk of rows to `tabledata.insertAll`. * * Partial-success caveat: BigQuery may return HTTP 200 with a non-empty * `insertErrors` array. Rows not listed there are inserted and dedup-keyed by * `insertId` for ~60s. We throw on any `insertErrors`; retries within the * dedup window ar

(input: InsertAllInput)

Source from the content-addressed store, hash-verified

160 * dedup window are safe, but retries after it may duplicate succeeded rows.
161 */
162async function insertAll(input: InsertAllInput): Promise<void> {
163 if (input.rows.length > MAX_ROWS_PER_REQUEST) {
164 throw new Error(
165 `BigQuery insertAll chunk has ${input.rows.length} rows, exceeds the ${MAX_ROWS_PER_REQUEST} per-request limit`
166 )
167 }
168 const url = `https://bigquery.googleapis.com/bigquery/v2/projects/${encodeURIComponent(input.config.projectId)}/datasets/${encodeURIComponent(input.config.datasetId)}/tables/${encodeURIComponent(input.config.tableId)}/insertAll`
169 /**
170 * `skipInvalidRows: false` and `ignoreUnknownValues: false` surface schema
171 * mismatches as `insertErrors` instead of silently dropping data — drains
172 * should fail loudly so operators notice the schema drift.
173 */
174 const payload = {
175 skipInvalidRows: false,
176 ignoreUnknownValues: false,
177 rows: input.rows.map((row, index) => {
178 const rowBytes = Buffer.byteLength(JSON.stringify(row), 'utf8')
179 if (rowBytes > MAX_ROW_BYTES) {
180 throw new Error(
181 `BigQuery row at index ${index} is ${rowBytes} bytes, exceeds the ${MAX_ROW_BYTES}-byte per-row limit`
182 )
183 }
184 return {
185 insertId: buildInsertId(input.metadata, index),
186 json: row,
187 }
188 }),
189 }
190 const body = JSON.stringify(payload)
191 const byteLength = Buffer.byteLength(body, 'utf8')
192 if (byteLength > MAX_REQUEST_BYTES) {
193 throw new Error(
194 `BigQuery insertAll body is ${byteLength} bytes, exceeds the ${MAX_REQUEST_BYTES}-byte per-request limit`
195 )
196 }
197 let attempt = 0
198 let response: Response | undefined
199 let refreshedOnce = false
200 while (true) {
201 attempt++
202 try {
203 response = await postInsertAll(input, url, body)
204 /** A 401 retry doesn't count against the 5xx/429 budget — token refresh is a one-shot recovery. */
205 if (response.status === 401 && !refreshedOnce) {
206 refreshedOnce = true
207 logger.debug('BigQuery returned 401; refreshing access token and retrying once')
208 /** Drain the 401 body before discarding so undici can return the socket to the keep-alive pool. */
209 await response.text().catch(() => '')
210 response = await postInsertAll(input, url, body, true)
211 }
212 if (!RETRYABLE_STATUSES.has(response.status)) break
213 if (attempt >= MAX_RETRY_ATTEMPTS) break
214 const retryAfterHeaderMs = parseRetryAfter(response.headers.get('retry-after'))
215 const retryAfterMs = backoffWithJitter(attempt, retryAfterHeaderMs, {
216 baseMs: BASE_RETRY_DELAY_MS,
217 })
218 logger.warn('BigQuery insertAll transient error; retrying', {
219 status: response.status,

Callers 1

deliverFunction · 0.85

Calls 11

parseRetryAfterFunction · 0.90
backoffWithJitterFunction · 0.90
sleepUntilAbortedFunction · 0.90
toErrorFunction · 0.90
buildInsertIdFunction · 0.85
postInsertAllFunction · 0.85
debugMethod · 0.80
textMethod · 0.80
joinMethod · 0.80
getMethod · 0.65
warnMethod · 0.65

Tested by

no test coverage detected