MCPcopy
hub / github.com/simstudioai/sim / flushVariableUpdate

Function flushVariableUpdate

apps/realtime/src/handlers/variables.ts:224–340  ·  view source on GitHub ↗
(
  workflowId: string,
  pending: PendingVariable,
  roomManager: IRoomManager
)

Source from the content-addressed store, hash-verified

222}
223
224async function flushVariableUpdate(
225 workflowId: string,
226 pending: PendingVariable,
227 roomManager: IRoomManager
228) {
229 const { variableId, field, value, timestamp } = pending.latest
230 const io = roomManager.io
231
232 try {
233 const workflowExists = await db
234 .select({ id: workflow.id })
235 .from(workflow)
236 .where(eq(workflow.id, workflowId))
237 .limit(1)
238
239 if (workflowExists.length === 0) {
240 pending.opToSocket.forEach((socketId, opId) => {
241 io.to(socketId).emit('operation-failed', {
242 operationId: opId,
243 error: 'Workflow not found',
244 retryable: true,
245 })
246 })
247 return
248 }
249
250 try {
251 await assertWorkflowMutable(workflowId)
252 } catch (error) {
253 if (error instanceof WorkflowLockedError) {
254 pending.opToSocket.forEach((socketId, opId) => {
255 io.to(socketId).emit('operation-failed', {
256 operationId: opId,
257 error: error.message,
258 retryable: false,
259 })
260 })
261 return
262 }
263 throw error
264 }
265
266 let updateSuccessful = false
267 await db.transaction(async (tx) => {
268 const [workflowRecord] = await tx
269 .select({ variables: workflow.variables })
270 .from(workflow)
271 .where(eq(workflow.id, workflowId))
272 .limit(1)
273
274 if (!workflowRecord) {
275 return
276 }
277
278 const variables = (workflowRecord.variables as any) || {}
279 if (!variables[variableId]) {
280 return
281 }

Callers 1

setupVariablesHandlersFunction · 0.85

Calls 6

assertWorkflowMutableFunction · 0.90
getErrorMessageFunction · 0.90
debugMethod · 0.80
errorMethod · 0.80
setMethod · 0.65
eqFunction · 0.50

Tested by

no test coverage detected