MCPcopy Index your code
hub / github.com/simstudioai/sim / createStreamingResponse

Function createStreamingResponse

apps/sim/lib/workflows/streaming/streaming.ts:350–643  ·  view source on GitHub ↗
(
  options: StreamingResponseOptions
)

Source from the content-addressed store, hash-verified

348}
349
350export async function createStreamingResponse(
351 options: StreamingResponseOptions
352): Promise<ReadableStream> {
353 const { requestId, streamConfig, executionId, executeFn } = options
354 const durableContext = {
355 workspaceId: options.workspaceId,
356 workflowId: options.workflowId,
357 executionId,
358 userId: options.userId,
359 requireDurable: Boolean(options.workspaceId && options.workflowId && executionId),
360 }
361 const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs)
362
363 return new ReadableStream({
364 async start(controller) {
365 const state: StreamingState = {
366 streamedChunks: new Map(),
367 processedOutputs: new Set(),
368 streamCompletionTimes: new Map(),
369 completedBlockIds: new Set(),
370 selectedOutputBytes: 0,
371 streamedSelectedOutputKeys: new Set(),
372 }
373
374 const sendChunk = (
375 blockId: string,
376 content: string,
377 options: { selectedOutputKey?: string; selectedOutputBytes?: number } = {}
378 ) => {
379 const separator = state.processedOutputs.size > 0 ? '\n\n' : ''
380 const chunk = separator + content
381 if (options.selectedOutputKey) {
382 const selectedOutputBytes =
383 options.selectedOutputBytes ?? Buffer.byteLength(chunk, 'utf8')
384 const nextSelectedOutputBytes = state.selectedOutputBytes + selectedOutputBytes
385 assertInlineMaterializationSize(nextSelectedOutputBytes, MAX_INLINE_MATERIALIZATION_BYTES)
386 state.selectedOutputBytes = nextSelectedOutputBytes
387 state.streamedSelectedOutputKeys.add(options.selectedOutputKey)
388 }
389 controller.enqueue(encodeSSE({ blockId, chunk }))
390 state.processedOutputs.add(blockId)
391 }
392
393 /**
394 * Callback for handling streaming execution events.
395 */
396 const onStreamCallback = async (streamingExec: StreamingExecutionWithBlockId) => {
397 const blockId = streamingExec.execution?.blockId
398 if (!blockId) {
399 logger.warn(`[${requestId}] Streaming execution missing blockId`)
400 return
401 }
402
403 const reader = streamingExec.stream.getReader()
404 const decoder = new TextDecoder()
405 let isFirstChunk = true
406
407 try {

Callers 4

streaming.test.tsFile · 0.90
route.tsFile · 0.90
handleExecutePostFunction · 0.90
route.tsFile · 0.85

Calls 1

Tested by

no test coverage detected