( workflowId: string, pending: PendingSubblock, roomManager: IRoomManager )
| 235 | } |
| 236 | |
| 237 | async function flushSubblockUpdate( |
| 238 | workflowId: string, |
| 239 | pending: PendingSubblock, |
| 240 | roomManager: IRoomManager |
| 241 | ) { |
| 242 | const { blockId, subblockId, value, timestamp } = pending.latest |
| 243 | const io = roomManager.io |
| 244 | |
| 245 | try { |
| 246 | // Verify workflow still exists |
| 247 | const workflowExists = await db |
| 248 | .select({ id: workflow.id }) |
| 249 | .from(workflow) |
| 250 | .where(eq(workflow.id, workflowId)) |
| 251 | .limit(1) |
| 252 | |
| 253 | if (workflowExists.length === 0) { |
| 254 | pending.opToSocket.forEach((socketId, opId) => { |
| 255 | io.to(socketId).emit('operation-failed', { |
| 256 | operationId: opId, |
| 257 | error: 'Workflow not found', |
| 258 | retryable: true, |
| 259 | }) |
| 260 | }) |
| 261 | return |
| 262 | } |
| 263 | |
| 264 | try { |
| 265 | await assertWorkflowMutable(workflowId) |
| 266 | } catch (error) { |
| 267 | if (error instanceof WorkflowLockedError) { |
| 268 | pending.opToSocket.forEach((socketId, opId) => { |
| 269 | io.to(socketId).emit('operation-failed', { |
| 270 | operationId: opId, |
| 271 | error: error.message, |
| 272 | retryable: false, |
| 273 | }) |
| 274 | }) |
| 275 | return |
| 276 | } |
| 277 | throw error |
| 278 | } |
| 279 | |
| 280 | let updateSuccessful = false |
| 281 | let blockLocked = false |
| 282 | await db.transaction(async (tx) => { |
| 283 | const allBlocks = await tx |
| 284 | .select({ |
| 285 | id: workflowBlocks.id, |
| 286 | subBlocks: workflowBlocks.subBlocks, |
| 287 | locked: workflowBlocks.locked, |
| 288 | data: workflowBlocks.data, |
| 289 | }) |
| 290 | .from(workflowBlocks) |
| 291 | .where(eq(workflowBlocks.workflowId, workflowId)) |
| 292 | |
| 293 | type SubblockUpdateBlockRecord = (typeof allBlocks)[number] |
| 294 | const blocksById: Record<string, SubblockUpdateBlockRecord> = Object.fromEntries( |
no test coverage detected