(workflowId: string, operation: any)
| 211 | } |
| 212 | |
| 213 | export async function persistWorkflowOperation(workflowId: string, operation: any) { |
| 214 | const startTime = Date.now() |
| 215 | try { |
| 216 | const { operation: op, target, payload, timestamp, userId } = operation |
| 217 | |
| 218 | const activeWorkflow = await getActiveWorkflowContext(workflowId) |
| 219 | if (!activeWorkflow) { |
| 220 | throw new Error(`Workflow ${workflowId} is archived or unavailable`) |
| 221 | } |
| 222 | |
| 223 | if (op === BLOCK_OPERATIONS.UPDATE_POSITION && randomFloat() < 0.01) { |
| 224 | logger.debug('Socket DB operation sample:', { |
| 225 | operation: op, |
| 226 | target, |
| 227 | workflowId: `${workflowId.substring(0, 8)}...`, |
| 228 | }) |
| 229 | } |
| 230 | |
| 231 | await db.transaction(async (tx) => { |
| 232 | await tx |
| 233 | .update(workflow) |
| 234 | .set({ updatedAt: new Date(timestamp) }) |
| 235 | .where(eq(workflow.id, workflowId)) |
| 236 | |
| 237 | switch (target) { |
| 238 | case OPERATION_TARGETS.BLOCK: |
| 239 | await handleBlockOperationTx(tx, workflowId, op, payload) |
| 240 | break |
| 241 | case OPERATION_TARGETS.BLOCKS: |
| 242 | await handleBlocksOperationTx(tx, workflowId, op, payload) |
| 243 | break |
| 244 | case OPERATION_TARGETS.EDGE: |
| 245 | await handleEdgeOperationTx(tx, workflowId, op, payload) |
| 246 | break |
| 247 | case OPERATION_TARGETS.EDGES: |
| 248 | await handleEdgesOperationTx(tx, workflowId, op, payload) |
| 249 | break |
| 250 | case OPERATION_TARGETS.SUBFLOW: |
| 251 | await handleSubflowOperationTx(tx, workflowId, op, payload) |
| 252 | break |
| 253 | case OPERATION_TARGETS.SUBBLOCK: |
| 254 | await handleSubblockOperationTx(tx, workflowId, op, payload) |
| 255 | break |
| 256 | case OPERATION_TARGETS.VARIABLE: |
| 257 | await handleVariableOperationTx(tx, workflowId, op, payload) |
| 258 | break |
| 259 | case OPERATION_TARGETS.WORKFLOW: |
| 260 | await handleWorkflowOperationTx(tx, workflowId, op, payload) |
| 261 | break |
| 262 | default: |
| 263 | throw new Error(`Unknown operation target: ${target}`) |
| 264 | } |
| 265 | }) |
| 266 | |
| 267 | // Audit workflow-level lock/unlock operations |
| 268 | if ( |
| 269 | target === OPERATION_TARGETS.BLOCKS && |
| 270 | op === BLOCKS_OPERATIONS.BATCH_TOGGLE_LOCKED && |
no test coverage detected