MCPcopy Index your code
hub / github.com/simstudioai/sim / runRowCascadeLoop

Function runRowCascadeLoop

apps/sim/background/workflow-column-execution.ts:104–160  ·  view source on GitHub ↗
(
  payload: WorkflowGroupCellPayload,
  signal?: AbortSignal
)

Source from the content-addressed store, hash-verified

102 * cascade become visible to the eligibility check. The resume worker must
103 * already hold the row's cascade lock before calling. */
104export async function runRowCascadeLoop(
105 payload: WorkflowGroupCellPayload,
106 signal?: AbortSignal
107): Promise<'blocked' | undefined> {
108 const { tableId, rowId, workspaceId } = payload
109 const { getTableById } = await import('@/lib/table/service')
110 const { getRowById } = await import('@/lib/table/rows/service')
111 const { pickNextEligibleGroupForRow } = await import('@/lib/table/workflow-columns')
112
113 let currentGroupId = payload.groupId
114 let currentWorkflowId = payload.workflowId
115 // Fresh executionId per iteration: SQL guard rejects writes whose id ≠
116 // row.executions[gid].executionId, so we need a new claim per group.
117 let currentExecutionId = payload.executionId
118
119 while (true) {
120 if (signal?.aborted) break
121
122 const freshTable = await getTableById(tableId)
123 if (!freshTable) {
124 logger.warn(`Table ${tableId} vanished mid-cascade`)
125 break
126 }
127 const currentGroup = freshTable.schema.workflowGroups?.find((g) => g.id === currentGroupId)
128 if (!currentGroup) {
129 logger.warn(`Group ${currentGroupId} no longer exists on table ${tableId}`)
130 break
131 }
132
133 const result = await runWorkflowAndWriteTerminal(
134 {
135 ...payload,
136 groupId: currentGroupId,
137 workflowId: currentWorkflowId,
138 executionId: currentExecutionId,
139 },
140 signal,
141 freshTable,
142 currentGroup
143 )
144
145 if (result === 'paused') break
146 // Hard stop (e.g. usage limit): the dispatch was halted and no cell was
147 // marked. Propagate so the outer re-drive loop stops too — otherwise it
148 // would re-pick the still-pending queued marker and spin.
149 if (result === 'blocked') return 'blocked'
150
151 const freshRow = await getRowById(tableId, rowId, workspaceId)
152 if (!freshRow) break
153 const next = pickNextEligibleGroupForRow(freshTable, freshRow, currentGroupId)
154 if (!next) break
155 currentGroupId = next.id
156 currentWorkflowId = next.workflowId
157 currentExecutionId = generateId()
158 }
159 return undefined
160}
161

Callers 2

Calls 6

generateIdFunction · 0.90
getTableByIdFunction · 0.85
getRowByIdFunction · 0.85
warnMethod · 0.65

Tested by

no test coverage detected