(dispatchId: string)
| 368 | /** Run one window of the dispatcher state machine. Caller re-invokes (via the |
| 369 | * trigger.dev task wrapper) until the returned status is `'done'`. */ |
| 370 | export async function dispatcherStep(dispatchId: string): Promise<DispatcherStepResult> { |
| 371 | const dispatch = await readDispatch(dispatchId) |
| 372 | if (!dispatch) { |
| 373 | logger.warn(`[${dispatchId}] dispatch row missing — aborting`) |
| 374 | return 'done' |
| 375 | } |
| 376 | if (dispatch.status === 'cancelled' || dispatch.status === 'complete') return 'done' |
| 377 | |
| 378 | const { getTableById } = await import('@/lib/table/service') |
| 379 | const table = await getTableById(dispatch.tableId) |
| 380 | if (!table) { |
| 381 | logger.warn(`[${dispatchId}] table ${dispatch.tableId} missing — completing dispatch`) |
| 382 | await markDispatchComplete(dispatchId) |
| 383 | return 'done' |
| 384 | } |
| 385 | |
| 386 | const allGroups = table.schema.workflowGroups ?? [] |
| 387 | const targetGroups = allGroups.filter((g) => dispatch.scope.groupIds.includes(g.id)) |
| 388 | if (targetGroups.length === 0) { |
| 389 | await markDispatchComplete(dispatchId) |
| 390 | return 'done' |
| 391 | } |
| 392 | |
| 393 | // First iteration: just transition pending → dispatching. The bulk clear |
| 394 | // ran synchronously in `runWorkflowColumn` before this task fired, so the |
| 395 | // user already saw the column flip to empty/Pending before any cell |
| 396 | // started enqueueing. |
| 397 | if (dispatch.status === 'pending') { |
| 398 | await db |
| 399 | .update(tableRunDispatches) |
| 400 | .set({ status: 'dispatching' }) |
| 401 | .where(eq(tableRunDispatches.id, dispatchId)) |
| 402 | // Announce the dispatch the moment it starts — before the first window's |
| 403 | // cells finish. Without this, auto-fired and capped dispatches (no client- |
| 404 | // side optimistic seed) emit their first `dispatch` event only after window |
| 405 | // 1 completes, so the "X running" / Stop-all control stays hidden while a |
| 406 | // long first window runs. The client refetches the run-state count on this. |
| 407 | await appendTableEvent({ |
| 408 | kind: 'dispatch', |
| 409 | tableId: dispatch.tableId, |
| 410 | dispatchId, |
| 411 | status: 'dispatching', |
| 412 | scope: dispatch.scope, |
| 413 | cursor: dispatch.cursor, |
| 414 | mode: dispatch.mode, |
| 415 | isManualRun: dispatch.isManualRun, |
| 416 | ...(dispatch.limit ? { limit: dispatch.limit } : {}), |
| 417 | }) |
| 418 | } |
| 419 | |
| 420 | const filters = [ |
| 421 | eq(userTableRows.tableId, dispatch.tableId), |
| 422 | gt(userTableRows.position, dispatch.cursor), |
| 423 | ] |
| 424 | let hasJsonbFilter = false |
| 425 | if (dispatch.scope.rowIds && dispatch.scope.rowIds.length > 0) { |
| 426 | filters.push(inArray(userTableRows.id, dispatch.scope.rowIds)) |
| 427 | } else if (dispatch.scope.filter) { |
no test coverage detected