MCPcopy
hub / github.com/simstudioai/sim / onStreamCallback

Function onStreamCallback

apps/sim/lib/workflows/streaming/streaming.ts:396–438  ·  view source on GitHub ↗
(streamingExec: StreamingExecutionWithBlockId)

Source from the content-addressed store, hash-verified

394 * Callback for handling streaming execution events.
395 */
396 const onStreamCallback = async (streamingExec: StreamingExecutionWithBlockId) => {
397 const blockId = streamingExec.execution?.blockId
398 if (!blockId) {
399 logger.warn(`[${requestId}] Streaming execution missing blockId`)
400 return
401 }
402
403 const reader = streamingExec.stream.getReader()
404 const decoder = new TextDecoder()
405 let isFirstChunk = true
406
407 try {
408 while (true) {
409 const { done, value } = await reader.read()
410 if (done) {
411 state.streamCompletionTimes.set(blockId, Date.now())
412 break
413 }
414
415 const textChunk = decoder.decode(value, { stream: true })
416 if (!state.streamedChunks.has(blockId)) {
417 state.streamedChunks.set(blockId, [])
418 }
419 state.streamedChunks.get(blockId)!.push(textChunk)
420
421 if (isFirstChunk) {
422 sendChunk(blockId, textChunk)
423 isFirstChunk = false
424 } else {
425 controller.enqueue(encodeSSE({ blockId, chunk: textChunk }))
426 }
427 }
428 } catch (error) {
429 logger.error(`[${requestId}] Error reading stream for block ${blockId}:`, error)
430 controller.enqueue(
431 encodeSSE({
432 event: 'stream_error',
433 blockId,
434 error: getErrorMessage(error, 'Stream reading error'),
435 })
436 )
437 }
438 }
439
440 const includeFileBase64 = streamConfig.includeFileBase64 ?? true
441 const base64MaxBytes = streamConfig.base64MaxBytes

Callers

nothing calls this directly

Calls 10

encodeSSEFunction · 0.90
getErrorMessageFunction · 0.90
sendChunkFunction · 0.85
errorMethod · 0.80
warnMethod · 0.65
setMethod · 0.65
getMethod · 0.65
enqueueMethod · 0.65
readMethod · 0.45
pushMethod · 0.45

Tested by

no test coverage detected