MCPcopy
hub / github.com/simstudioai/sim / flush

Function flush

apps/sim/lib/table/import-runner.ts:195–247  ·  view source on GitHub ↗
(rows: Record<string, unknown>[])

Source from the content-addressed store, hash-verified

193 }
194
195 const flush = async (rows: Record<string, unknown>[]) => {
196 if (rows.length === 0 || !schema || !headerToColumn) return
197 // Ownership gate before every insert: once this run loses the table (cancel/supersede),
198 // updateJobProgress returns false and we stop before writing into a table a newer import
199 // may own. Runs per batch (not just at the emit cadence) so we stop within one batch.
200 const owns = await updateJobProgress(tableId, inserted, importId)
201 if (!owns) throw new ImportSupersededError()
202 const coerced = coerceRowsForTable(rows, schema, headerToColumn)
203 const rowLimit = await assertRowCapacity({
204 workspaceId,
205 currentRowCount: existingRowCount + inserted,
206 addedRows: coerced.length,
207 })
208 const result = await bulkInsertImportBatch(
209 {
210 tableId,
211 workspaceId,
212 userId,
213 rows: coerced,
214 startPosition: basePosition + inserted,
215 afterOrderKey: lastOrderKey,
216 },
217 { ...table, schema },
218 requestId
219 )
220 notifyTableRowUsage({
221 workspaceId,
222 currentRowCount: existingRowCount + inserted,
223 addedRows: result.inserted,
224 limit: rowLimit,
225 })
226 inserted += result.inserted
227 lastOrderKey = result.lastOrderKey
228 // Emit after the first batch, then every interval, so the bar appears early without flooding.
229 if (
230 inserted - lastReported >= PROGRESS_INTERVAL_ROWS ||
231 (lastReported === 0 && inserted > 0)
232 ) {
233 lastReported = inserted
234 // Exact, monotonic completion from bytes consumed — no wobbly row estimate.
235 const percent =
236 totalBytes > 0 ? Math.min(99, Math.round((bytesRead / totalBytes) * 100)) : undefined
237 void appendTableEvent({
238 kind: 'job',
239 type: 'import',
240 tableId,
241 jobId: importId,
242 status: 'running',
243 progress: inserted,
244 percent,
245 })
246 }
247 }
248
249 let ready = false
250 for await (const record of parser as AsyncIterable<Record<string, unknown>>) {

Callers 1

runTableImportFunction · 0.70

Calls 6

updateJobProgressFunction · 0.90
coerceRowsForTableFunction · 0.90
assertRowCapacityFunction · 0.90
bulkInsertImportBatchFunction · 0.90
notifyTableRowUsageFunction · 0.90
appendTableEventFunction · 0.90

Tested by

no test coverage detected