MCPcopy Index your code
hub / github.com/simstudioai/sim / writeWorkflowGroupState

Function writeWorkflowGroupState

apps/sim/lib/table/cell-write.ts:41–141  ·  view source on GitHub ↗
(
  ctx: WriteWorkflowGroupContext,
  payload: WriteWorkflowGroupStatePayload
)

Source from the content-addressed store, hash-verified

39 * short-circuit any follow-up writes / job dispatch.
40 */
41export async function writeWorkflowGroupState(
42 ctx: WriteWorkflowGroupContext,
43 payload: WriteWorkflowGroupStatePayload
44): Promise<'wrote' | 'skipped'> {
45 const { tableId, rowId, workspaceId, groupId, executionId } = ctx
46 const requestId = ctx.requestId ?? `wfgrp-${executionId}`
47 const { getTableById } = await import('@/lib/table/service')
48 const { getRowById, updateRow } = await import('@/lib/table/rows/service')
49
50 const table = await getTableById(tableId)
51 if (!table) {
52 logger.warn(`Table ${tableId} vanished before group state write`)
53 return 'wrote'
54 }
55 const row = await getRowById(tableId, rowId, workspaceId)
56 if (!row) {
57 logger.warn(`Row ${rowId} vanished before group state write`)
58 return 'wrote'
59 }
60 const current = row.executions?.[groupId] as RowExecutionMetadata | undefined
61 // Stale-worker guard: only blocks writes FROM an old worker (status =
62 // running / completed / error / pending). A `queued` stamp from the
63 // scheduler can claim the cell for a brand-new run — that's the new
64 // authority. Same for `cancelled` (always authoritative, written by stop).
65 const isCancelStamp = payload.executionState.status === 'cancelled'
66 const isQueuedStamp = payload.executionState.status === 'queued'
67 const isNewQueuedStamp = isQueuedStamp && current?.executionId !== executionId
68 const bypassStaleWorker = isNewQueuedStamp || isCancelStamp
69 if (!bypassStaleWorker && current && current.executionId && current.executionId !== executionId) {
70 logger.info(
71 `Skipping group write — stale worker (table=${tableId} row=${rowId} group=${groupId} mine=${executionId} active=${current.executionId})`
72 )
73 return 'skipped'
74 }
75 // A late `queued` stamp for the SAME run that's already moved past queued
76 // (worker called markWorkflowGroupPickedUp before our parallel stamp landed)
77 // must NOT overwrite the further-along state. Without this, a cell can show
78 // "queued" forever while the worker is actually running.
79 if (isQueuedStamp && current?.executionId === executionId && current.status !== 'pending') {
80 logger.info(
81 `Skipping queued stamp — same run already at status=${current.status} (table=${tableId} row=${rowId} group=${groupId} executionId=${executionId})`
82 )
83 return 'skipped'
84 }
85 // A `cancelled` cell rejects any worker write regardless of executionId — a
86 // stop click can only stamp the dispatcher pre-stamp's executionId (often
87 // null), so an executionId-matched guard would let the worker that later
88 // claims the cell with its real id resurrect it. `bypassStaleWorker` (a fresh
89 // `queued` claim from a new dispatch, or the authoritative cancel write
90 // itself) still passes; manual re-runs clear the tombstone before stamping.
91 if (!bypassStaleWorker && isExecCancelled(current)) {
92 logger.info(
93 `Skipping group write — cancelled (table=${tableId} row=${rowId} group=${groupId} executionId=${executionId})`
94 )
95 return 'skipped'
96 }
97 // Skip writing `cancelled` state with the guard — that's an authoritative
98 // write from `cancelWorkflowGroupRuns` and must always land. New `queued`

Callers 5

stampQueuedForBatchFunction · 0.90
writeStateFunction · 0.85
cellOnBlockCompleteFunction · 0.85
writeCellTerminalFunction · 0.85

Calls 7

isExecCancelledFunction · 0.90
appendTableEventFunction · 0.90
getTableByIdFunction · 0.85
getRowByIdFunction · 0.85
updateRowFunction · 0.85
infoMethod · 0.80
warnMethod · 0.65

Tested by

no test coverage detected