MCPcopy
hub / github.com/simstudioai/sim / start

Function start

apps/sim/lib/workflows/streaming/streaming.ts:364–629  ·  view source on GitHub ↗
(controller)

Source from the content-addressed store, hash-verified

362
363 return new ReadableStream({
364 async start(controller) {
365 const state: StreamingState = {
366 streamedChunks: new Map(),
367 processedOutputs: new Set(),
368 streamCompletionTimes: new Map(),
369 completedBlockIds: new Set(),
370 selectedOutputBytes: 0,
371 streamedSelectedOutputKeys: new Set(),
372 }
373
374 const sendChunk = (
375 blockId: string,
376 content: string,
377 options: { selectedOutputKey?: string; selectedOutputBytes?: number } = {}
378 ) => {
379 const separator = state.processedOutputs.size > 0 ? '\n\n' : ''
380 const chunk = separator + content
381 if (options.selectedOutputKey) {
382 const selectedOutputBytes =
383 options.selectedOutputBytes ?? Buffer.byteLength(chunk, 'utf8')
384 const nextSelectedOutputBytes = state.selectedOutputBytes + selectedOutputBytes
385 assertInlineMaterializationSize(nextSelectedOutputBytes, MAX_INLINE_MATERIALIZATION_BYTES)
386 state.selectedOutputBytes = nextSelectedOutputBytes
387 state.streamedSelectedOutputKeys.add(options.selectedOutputKey)
388 }
389 controller.enqueue(encodeSSE({ blockId, chunk }))
390 state.processedOutputs.add(blockId)
391 }
392
393 /**
394 * Callback for handling streaming execution events.
395 */
396 const onStreamCallback = async (streamingExec: StreamingExecutionWithBlockId) => {
397 const blockId = streamingExec.execution?.blockId
398 if (!blockId) {
399 logger.warn(`[${requestId}] Streaming execution missing blockId`)
400 return
401 }
402
403 const reader = streamingExec.stream.getReader()
404 const decoder = new TextDecoder()
405 let isFirstChunk = true
406
407 try {
408 while (true) {
409 const { done, value } = await reader.read()
410 if (done) {
411 state.streamCompletionTimes.set(blockId, Date.now())
412 break
413 }
414
415 const textChunk = decoder.decode(value, { stream: true })
416 if (!state.streamedChunks.has(blockId)) {
417 state.streamedChunks.set(blockId, [])
418 }
419 state.streamedChunks.get(blockId)!.push(textChunk)
420
421 if (isFirstChunk) {

Callers

nothing calls this directly

Calls 15

getTimeoutErrorMessageFunction · 0.90
encodeSSEFunction · 0.90
getErrorMessageFunction · 0.90
resolveStreamedContentFunction · 0.85
completeLoggingSessionFunction · 0.85
buildMinimalResultFunction · 0.85
infoMethod · 0.80
markAsFailedMethod · 0.80

Tested by

no test coverage detected