( workflowId: string, pending: PendingVariable, roomManager: IRoomManager )
| 222 | } |
| 223 | |
| 224 | async function flushVariableUpdate( |
| 225 | workflowId: string, |
| 226 | pending: PendingVariable, |
| 227 | roomManager: IRoomManager |
| 228 | ) { |
| 229 | const { variableId, field, value, timestamp } = pending.latest |
| 230 | const io = roomManager.io |
| 231 | |
| 232 | try { |
| 233 | const workflowExists = await db |
| 234 | .select({ id: workflow.id }) |
| 235 | .from(workflow) |
| 236 | .where(eq(workflow.id, workflowId)) |
| 237 | .limit(1) |
| 238 | |
| 239 | if (workflowExists.length === 0) { |
| 240 | pending.opToSocket.forEach((socketId, opId) => { |
| 241 | io.to(socketId).emit('operation-failed', { |
| 242 | operationId: opId, |
| 243 | error: 'Workflow not found', |
| 244 | retryable: true, |
| 245 | }) |
| 246 | }) |
| 247 | return |
| 248 | } |
| 249 | |
| 250 | try { |
| 251 | await assertWorkflowMutable(workflowId) |
| 252 | } catch (error) { |
| 253 | if (error instanceof WorkflowLockedError) { |
| 254 | pending.opToSocket.forEach((socketId, opId) => { |
| 255 | io.to(socketId).emit('operation-failed', { |
| 256 | operationId: opId, |
| 257 | error: error.message, |
| 258 | retryable: false, |
| 259 | }) |
| 260 | }) |
| 261 | return |
| 262 | } |
| 263 | throw error |
| 264 | } |
| 265 | |
| 266 | let updateSuccessful = false |
| 267 | await db.transaction(async (tx) => { |
| 268 | const [workflowRecord] = await tx |
| 269 | .select({ variables: workflow.variables }) |
| 270 | .from(workflow) |
| 271 | .where(eq(workflow.id, workflowId)) |
| 272 | .limit(1) |
| 273 | |
| 274 | if (!workflowRecord) { |
| 275 | return |
| 276 | } |
| 277 | |
| 278 | const variables = (workflowRecord.variables as any) || {} |
| 279 | if (!variables[variableId]) { |
| 280 | return |
| 281 | } |
no test coverage detected