MCPcopy
hub / github.com/simstudioai/sim / maybeBackfillGroupOutputs

Function maybeBackfillGroupOutputs

apps/sim/lib/table/backfill-runner.ts:264–345  ·  view source on GitHub ↗
(opts: {
  table: TableDefinition
  groupId: string
  outputs: WorkflowGroupOutput[]
  overwrite: boolean
  requestId: string
  actorUserId?: string | null
})

Source from the content-addressed store, hash-verified

262 * change" posture (the data stays backfillable).
263 */
264export async function maybeBackfillGroupOutputs(opts: {
265 table: TableDefinition
266 groupId: string
267 outputs: WorkflowGroupOutput[]
268 overwrite: boolean
269 requestId: string
270 actorUserId?: string | null
271}): Promise<void> {
272 const { table, groupId, outputs, overwrite, requestId, actorUserId } = opts
273 if (outputs.length === 0) return
274
275 const [{ count: completedCount }] = await db
276 .select({ count: count() })
277 .from(tableRowExecutions)
278 .where(
279 and(
280 eq(tableRowExecutions.tableId, table.id),
281 eq(tableRowExecutions.groupId, groupId),
282 eq(tableRowExecutions.status, 'completed')
283 )
284 )
285 const total = Number(completedCount)
286 if (total === 0) return
287
288 if (total <= BACKFILL_ASYNC_THRESHOLD_ROWS) {
289 // Inline: page without job machinery so memory stays bounded but the caller can await
290 // full consistency.
291 let afterRowId: string | undefined
292 while (true) {
293 const execs = await selectCompletedExecPage(table.id, groupId, afterRowId, BACKFILL_PAGE_SIZE)
294 if (execs.length === 0) break
295 afterRowId = execs[execs.length - 1].rowId
296 await processBackfillPage({ table, outputs, overwrite, execs, requestId, actorUserId })
297 }
298 return
299 }
300
301 const jobId = generateId()
302 const jobPayload: TableBackfillJobPayload = { groupId, outputs, overwrite }
303 const claimed = await markTableJobRunning(table.id, jobId, 'backfill', jobPayload)
304 if (!claimed) {
305 logger.warn(
306 `[${requestId}] Skipping backfill for table ${table.id} group ${groupId}: another job is running`
307 )
308 return
309 }
310
311 const payload: TableBackfillPayload = {
312 jobId,
313 tableId: table.id,
314 workspaceId: table.workspaceId,
315 groupId,
316 outputs,
317 overwrite,
318 actorUserId,
319 }
320 if (isTriggerDevEnabled) {
321 try {

Callers 2

updateWorkflowGroupFunction · 0.85
addWorkflowGroupOutputFunction · 0.85

Calls 11

generateIdFunction · 0.90
markTableJobRunningFunction · 0.90
getErrorMessageFunction · 0.90
runDetachedFunction · 0.90
selectCompletedExecPageFunction · 0.85
processBackfillPageFunction · 0.85
resolveTriggerRegionFunction · 0.85
releaseJobClaimFunction · 0.85
runTableBackfillFunction · 0.85
warnMethod · 0.65
eqFunction · 0.50

Tested by

no test coverage detected