MCPcopy
hub / github.com/simstudioai/sim / dispatcherStep

Function dispatcherStep

apps/sim/lib/table/dispatcher.ts:370–653  ·  view source on GitHub ↗
(dispatchId: string)

Source from the content-addressed store, hash-verified

368/** Run one window of the dispatcher state machine. Caller re-invokes (via the
369 * trigger.dev task wrapper) until the returned status is `'done'`. */
370export async function dispatcherStep(dispatchId: string): Promise<DispatcherStepResult> {
371 const dispatch = await readDispatch(dispatchId)
372 if (!dispatch) {
373 logger.warn(`[${dispatchId}] dispatch row missing — aborting`)
374 return 'done'
375 }
376 if (dispatch.status === 'cancelled' || dispatch.status === 'complete') return 'done'
377
378 const { getTableById } = await import('@/lib/table/service')
379 const table = await getTableById(dispatch.tableId)
380 if (!table) {
381 logger.warn(`[${dispatchId}] table ${dispatch.tableId} missing — completing dispatch`)
382 await markDispatchComplete(dispatchId)
383 return 'done'
384 }
385
386 const allGroups = table.schema.workflowGroups ?? []
387 const targetGroups = allGroups.filter((g) => dispatch.scope.groupIds.includes(g.id))
388 if (targetGroups.length === 0) {
389 await markDispatchComplete(dispatchId)
390 return 'done'
391 }
392
393 // First iteration: just transition pending → dispatching. The bulk clear
394 // ran synchronously in `runWorkflowColumn` before this task fired, so the
395 // user already saw the column flip to empty/Pending before any cell
396 // started enqueueing.
397 if (dispatch.status === 'pending') {
398 await db
399 .update(tableRunDispatches)
400 .set({ status: 'dispatching' })
401 .where(eq(tableRunDispatches.id, dispatchId))
402 // Announce the dispatch the moment it starts — before the first window's
403 // cells finish. Without this, auto-fired and capped dispatches (no client-
404 // side optimistic seed) emit their first `dispatch` event only after window
405 // 1 completes, so the "X running" / Stop-all control stays hidden while a
406 // long first window runs. The client refetches the run-state count on this.
407 await appendTableEvent({
408 kind: 'dispatch',
409 tableId: dispatch.tableId,
410 dispatchId,
411 status: 'dispatching',
412 scope: dispatch.scope,
413 cursor: dispatch.cursor,
414 mode: dispatch.mode,
415 isManualRun: dispatch.isManualRun,
416 ...(dispatch.limit ? { limit: dispatch.limit } : {}),
417 })
418 }
419
420 const filters = [
421 eq(userTableRows.tableId, dispatch.tableId),
422 gt(userTableRows.position, dispatch.cursor),
423 ]
424 let hasJsonbFilter = false
425 if (dispatch.scope.rowIds && dispatch.scope.rowIds.length > 0) {
426 filters.push(inArray(userTableRows.id, dispatch.scope.rowIds))
427 } else if (dispatch.scope.filter) {

Callers 1

Calls 15

appendTableEventFunction · 0.90
buildFilterClauseFunction · 0.90
withSeqscanOffFunction · 0.90
toTableRowFunction · 0.90
isExecCancelledAfterFunction · 0.90
buildPendingRunsFunction · 0.90
buildEnqueueItemsFunction · 0.90
getJobQueueFunction · 0.90
toErrorFunction · 0.90
readDispatchFunction · 0.85
getTableByIdFunction · 0.85
markDispatchCompleteFunction · 0.85

Tested by

no test coverage detected