( ctx: WriteWorkflowGroupContext, payload: WriteWorkflowGroupStatePayload )
| 39 | * short-circuit any follow-up writes / job dispatch. |
| 40 | */ |
| 41 | export async function writeWorkflowGroupState( |
| 42 | ctx: WriteWorkflowGroupContext, |
| 43 | payload: WriteWorkflowGroupStatePayload |
| 44 | ): Promise<'wrote' | 'skipped'> { |
| 45 | const { tableId, rowId, workspaceId, groupId, executionId } = ctx |
| 46 | const requestId = ctx.requestId ?? `wfgrp-${executionId}` |
| 47 | const { getTableById } = await import('@/lib/table/service') |
| 48 | const { getRowById, updateRow } = await import('@/lib/table/rows/service') |
| 49 | |
| 50 | const table = await getTableById(tableId) |
| 51 | if (!table) { |
| 52 | logger.warn(`Table ${tableId} vanished before group state write`) |
| 53 | return 'wrote' |
| 54 | } |
| 55 | const row = await getRowById(tableId, rowId, workspaceId) |
| 56 | if (!row) { |
| 57 | logger.warn(`Row ${rowId} vanished before group state write`) |
| 58 | return 'wrote' |
| 59 | } |
| 60 | const current = row.executions?.[groupId] as RowExecutionMetadata | undefined |
| 61 | // Stale-worker guard: only blocks writes FROM an old worker (status = |
| 62 | // running / completed / error / pending). A `queued` stamp from the |
| 63 | // scheduler can claim the cell for a brand-new run — that's the new |
| 64 | // authority. Same for `cancelled` (always authoritative, written by stop). |
| 65 | const isCancelStamp = payload.executionState.status === 'cancelled' |
| 66 | const isQueuedStamp = payload.executionState.status === 'queued' |
| 67 | const isNewQueuedStamp = isQueuedStamp && current?.executionId !== executionId |
| 68 | const bypassStaleWorker = isNewQueuedStamp || isCancelStamp |
| 69 | if (!bypassStaleWorker && current && current.executionId && current.executionId !== executionId) { |
| 70 | logger.info( |
| 71 | `Skipping group write — stale worker (table=${tableId} row=${rowId} group=${groupId} mine=${executionId} active=${current.executionId})` |
| 72 | ) |
| 73 | return 'skipped' |
| 74 | } |
| 75 | // A late `queued` stamp for the SAME run that's already moved past queued |
| 76 | // (worker called markWorkflowGroupPickedUp before our parallel stamp landed) |
| 77 | // must NOT overwrite the further-along state. Without this, a cell can show |
| 78 | // "queued" forever while the worker is actually running. |
| 79 | if (isQueuedStamp && current?.executionId === executionId && current.status !== 'pending') { |
| 80 | logger.info( |
| 81 | `Skipping queued stamp — same run already at status=${current.status} (table=${tableId} row=${rowId} group=${groupId} executionId=${executionId})` |
| 82 | ) |
| 83 | return 'skipped' |
| 84 | } |
| 85 | // A `cancelled` cell rejects any worker write regardless of executionId — a |
| 86 | // stop click can only stamp the dispatcher pre-stamp's executionId (often |
| 87 | // null), so an executionId-matched guard would let the worker that later |
| 88 | // claims the cell with its real id resurrect it. `bypassStaleWorker` (a fresh |
| 89 | // `queued` claim from a new dispatch, or the authoritative cancel write |
| 90 | // itself) still passes; manual re-runs clear the tombstone before stamping. |
| 91 | if (!bypassStaleWorker && isExecCancelled(current)) { |
| 92 | logger.info( |
| 93 | `Skipping group write — cancelled (table=${tableId} row=${rowId} group=${groupId} executionId=${executionId})` |
| 94 | ) |
| 95 | return 'skipped' |
| 96 | } |
| 97 | // Skip writing `cancelled` state with the guard — that's an authoritative |
| 98 | // write from `cancelWorkflowGroupRuns` and must always land. New `queued` |
no test coverage detected