(
data: {
tableId: string
groupId: string
blockId: string
path: string
/** Optional override; defaults to a slug derived from `path`. */
columnName?: string
/** The member adding the output — billed/gated for any backfill-triggered re-run. */
actorUserId?: string | null
},
requestId: string
)
| 594 | * new output next to its siblings. |
| 595 | */ |
| 596 | export async function addWorkflowGroupOutput( |
| 597 | data: { |
| 598 | tableId: string |
| 599 | groupId: string |
| 600 | blockId: string |
| 601 | path: string |
| 602 | /** Optional override; defaults to a slug derived from `path`. */ |
| 603 | columnName?: string |
| 604 | /** The member adding the output — billed/gated for any backfill-triggered re-run. */ |
| 605 | actorUserId?: string | null |
| 606 | }, |
| 607 | requestId: string |
| 608 | ): Promise<TableDefinition> { |
| 609 | // Phase 1 (no lock): load the workflow and resolve the pickable output plus |
| 610 | // its execution-order index. This depends only on the workflow graph (which |
| 611 | // is stable), so it runs OFF the advisory-lock critical section — holding the |
| 612 | // lock during this DB load would make concurrent adders on the same table |
| 613 | // time out waiting (the Mothership fan-out this fix targets). Phase 2 |
| 614 | // re-validates that the group still maps to the same workflow under the lock. |
| 615 | const preTable = await getTableById(data.tableId) |
| 616 | if (!preTable) throw new Error('Table not found') |
| 617 | const preGroup = (preTable.schema.workflowGroups ?? []).find((g) => g.id === data.groupId) |
| 618 | if (!preGroup) { |
| 619 | throw new Error(`Workflow group "${data.groupId}" not found`) |
| 620 | } |
| 621 | const workflowId = preGroup.workflowId |
| 622 | |
| 623 | const [ |
| 624 | { loadWorkflowFromNormalizedTables }, |
| 625 | { flattenWorkflowOutputs, getBlockExecutionOrder }, |
| 626 | { columnTypeForLeaf, deriveOutputColumnName }, |
| 627 | ] = await Promise.all([ |
| 628 | import('@/lib/workflows/persistence/utils'), |
| 629 | import('@/lib/workflows/blocks/flatten-outputs'), |
| 630 | import('@/lib/table/column-naming'), |
| 631 | ]) |
| 632 | const normalized = await loadWorkflowFromNormalizedTables(workflowId) |
| 633 | if (!normalized) { |
| 634 | throw new Error(`Workflow ${workflowId} not found`) |
| 635 | } |
| 636 | const blocks = Object.values(normalized.blocks ?? {}).map((b) => ({ |
| 637 | id: b.id, |
| 638 | type: b.type, |
| 639 | name: b.name, |
| 640 | triggerMode: (b as { triggerMode?: boolean }).triggerMode, |
| 641 | subBlocks: b.subBlocks as Record<string, unknown> | undefined, |
| 642 | })) |
| 643 | const flattened = flattenWorkflowOutputs(blocks, normalized.edges ?? []) |
| 644 | const match = flattened.find((f) => f.blockId === data.blockId && f.path === data.path) |
| 645 | if (!match) { |
| 646 | throw new Error( |
| 647 | `Output ${data.blockId}::${data.path} is not a valid pickable output on workflow ${workflowId}` |
| 648 | ) |
| 649 | } |
| 650 | const newColumnType = columnTypeForLeaf(match.leafType) |
| 651 | const distances = getBlockExecutionOrder(blocks, normalized.edges ?? []) |
| 652 | const flatIndex = new Map(flattened.map((f, i) => [`${f.blockId}::${f.path}`, i])) |
| 653 |
no test coverage detected