(
tableId: string,
rowId?: string,
options?: { groupIds?: string[]; filter?: Filter; excludeRowIds?: string[] }
)
| 363 | * active dispatches running (their in-flight checks skip cancelled cells). |
| 364 | */ |
| 365 | export async function cancelWorkflowGroupRuns( |
| 366 | tableId: string, |
| 367 | rowId?: string, |
| 368 | options?: { groupIds?: string[]; filter?: Filter; excludeRowIds?: string[] } |
| 369 | ): Promise<number> { |
| 370 | const { getTableById } = await import('@/lib/table/service') |
| 371 | const { updateRow } = await import('@/lib/table/rows/service') |
| 372 | const { getJobQueue } = await import('@/lib/core/async-jobs/config') |
| 373 | const { listActiveDispatches, markActiveDispatchesCancelled } = await import( |
| 374 | '@/lib/table/dispatcher' |
| 375 | ) |
| 376 | |
| 377 | const table = await getTableById(tableId) |
| 378 | if (!table) { |
| 379 | logger.warn(`cancelWorkflowGroupRuns: table ${tableId} not found`) |
| 380 | return 0 |
| 381 | } |
| 382 | |
| 383 | // Per-row cancel leaves the dispatcher alone — other rows in the same |
| 384 | // dispatch keep running. Table-wide cancel must stop it, else the cursor |
| 385 | // marches on and re-enqueues fresh cells past what we just cancelled. |
| 386 | // Filter-scoped cancel stops only dispatches with that exact filter scope |
| 387 | // (its own run); whole-table or differently-scoped dispatches keep running — |
| 388 | // their cells cancelled below are skipped via `cancelledAt > requestedAt`. |
| 389 | if (!rowId) { |
| 390 | await markActiveDispatchesCancelled(tableId, options?.filter, options?.excludeRowIds) |
| 391 | } |
| 392 | |
| 393 | const allGroups = table.schema.workflowGroups ?? [] |
| 394 | if (allGroups.length === 0) return 0 |
| 395 | const groupIds = options?.groupIds |
| 396 | ? new Set(allGroups.filter((g) => options.groupIds?.includes(g.id)).map((g) => g.id)) |
| 397 | : new Set(allGroups.map((g) => g.id)) |
| 398 | if (groupIds.size === 0) return 0 |
| 399 | |
| 400 | // Per-row Stop on a row the dispatcher hasn't reached yet has no sidecar |
| 401 | // entry to cancel — the dispatcher would later walk to that row, see no |
| 402 | // exec, classify eligible, and re-fire. Pre-write `cancelled` tombstones |
| 403 | // for active-dispatch in-scope groups so the existing `cancelledAt > |
| 404 | // dispatch.requestedAt` filter in `dispatcherStep` catches them. Skip |
| 405 | // when there's no active dispatch (nothing to outrun). |
| 406 | let aheadOfCursorTombstones: Array<{ groupId: string; workflowId: string }> = [] |
| 407 | if (rowId) { |
| 408 | const activeDispatches = await listActiveDispatches(tableId) |
| 409 | const relevant = activeDispatches.filter((d) => { |
| 410 | if (d.scope.rowIds && !d.scope.rowIds.includes(rowId)) return false |
| 411 | return d.scope.groupIds.some((gid) => groupIds.has(gid)) |
| 412 | }) |
| 413 | if (relevant.length > 0) { |
| 414 | // Intersection of targeted groups with active-dispatch scopes — only |
| 415 | // these groups are at risk of being re-fired by an in-progress dispatch. |
| 416 | const atRisk = new Set<string>() |
| 417 | for (const d of relevant) { |
| 418 | for (const gid of d.scope.groupIds) { |
| 419 | if (groupIds.has(gid)) atRisk.add(gid) |
| 420 | } |
| 421 | } |
| 422 | aheadOfCursorTombstones = Array.from(atRisk).map((gid) => ({ |
no test coverage detected