( payload: WorkflowGroupCellPayload, signal?: AbortSignal )
| 102 | * cascade become visible to the eligibility check. The resume worker must |
| 103 | * already hold the row's cascade lock before calling. */ |
| 104 | export async function runRowCascadeLoop( |
| 105 | payload: WorkflowGroupCellPayload, |
| 106 | signal?: AbortSignal |
| 107 | ): Promise<'blocked' | undefined> { |
| 108 | const { tableId, rowId, workspaceId } = payload |
| 109 | const { getTableById } = await import('@/lib/table/service') |
| 110 | const { getRowById } = await import('@/lib/table/rows/service') |
| 111 | const { pickNextEligibleGroupForRow } = await import('@/lib/table/workflow-columns') |
| 112 | |
| 113 | let currentGroupId = payload.groupId |
| 114 | let currentWorkflowId = payload.workflowId |
| 115 | // Fresh executionId per iteration: SQL guard rejects writes whose id ≠ |
| 116 | // row.executions[gid].executionId, so we need a new claim per group. |
| 117 | let currentExecutionId = payload.executionId |
| 118 | |
| 119 | while (true) { |
| 120 | if (signal?.aborted) break |
| 121 | |
| 122 | const freshTable = await getTableById(tableId) |
| 123 | if (!freshTable) { |
| 124 | logger.warn(`Table ${tableId} vanished mid-cascade`) |
| 125 | break |
| 126 | } |
| 127 | const currentGroup = freshTable.schema.workflowGroups?.find((g) => g.id === currentGroupId) |
| 128 | if (!currentGroup) { |
| 129 | logger.warn(`Group ${currentGroupId} no longer exists on table ${tableId}`) |
| 130 | break |
| 131 | } |
| 132 | |
| 133 | const result = await runWorkflowAndWriteTerminal( |
| 134 | { |
| 135 | ...payload, |
| 136 | groupId: currentGroupId, |
| 137 | workflowId: currentWorkflowId, |
| 138 | executionId: currentExecutionId, |
| 139 | }, |
| 140 | signal, |
| 141 | freshTable, |
| 142 | currentGroup |
| 143 | ) |
| 144 | |
| 145 | if (result === 'paused') break |
| 146 | // Hard stop (e.g. usage limit): the dispatch was halted and no cell was |
| 147 | // marked. Propagate so the outer re-drive loop stops too — otherwise it |
| 148 | // would re-pick the still-pending queued marker and spin. |
| 149 | if (result === 'blocked') return 'blocked' |
| 150 | |
| 151 | const freshRow = await getRowById(tableId, rowId, workspaceId) |
| 152 | if (!freshRow) break |
| 153 | const next = pickNextEligibleGroupForRow(freshTable, freshRow, currentGroupId) |
| 154 | if (!next) break |
| 155 | currentGroupId = next.id |
| 156 | currentWorkflowId = next.workflowId |
| 157 | currentExecutionId = generateId() |
| 158 | } |
| 159 | return undefined |
| 160 | } |
| 161 |
no test coverage detected