| 394 | * Callback for handling streaming execution events. |
| 395 | */ |
| 396 | const onStreamCallback = async (streamingExec: StreamingExecutionWithBlockId) => { |
| 397 | const blockId = streamingExec.execution?.blockId |
| 398 | if (!blockId) { |
| 399 | logger.warn(`[${requestId}] Streaming execution missing blockId`) |
| 400 | return |
| 401 | } |
| 402 | |
| 403 | const reader = streamingExec.stream.getReader() |
| 404 | const decoder = new TextDecoder() |
| 405 | let isFirstChunk = true |
| 406 | |
| 407 | try { |
| 408 | while (true) { |
| 409 | const { done, value } = await reader.read() |
| 410 | if (done) { |
| 411 | state.streamCompletionTimes.set(blockId, Date.now()) |
| 412 | break |
| 413 | } |
| 414 | |
| 415 | const textChunk = decoder.decode(value, { stream: true }) |
| 416 | if (!state.streamedChunks.has(blockId)) { |
| 417 | state.streamedChunks.set(blockId, []) |
| 418 | } |
| 419 | state.streamedChunks.get(blockId)!.push(textChunk) |
| 420 | |
| 421 | if (isFirstChunk) { |
| 422 | sendChunk(blockId, textChunk) |
| 423 | isFirstChunk = false |
| 424 | } else { |
| 425 | controller.enqueue(encodeSSE({ blockId, chunk: textChunk })) |
| 426 | } |
| 427 | } |
| 428 | } catch (error) { |
| 429 | logger.error(`[${requestId}] Error reading stream for block ${blockId}:`, error) |
| 430 | controller.enqueue( |
| 431 | encodeSSE({ |
| 432 | event: 'stream_error', |
| 433 | blockId, |
| 434 | error: getErrorMessage(error, 'Stream reading error'), |
| 435 | }) |
| 436 | ) |
| 437 | } |
| 438 | } |
| 439 | |
| 440 | const includeFileBase64 = streamConfig.includeFileBase64 ?? true |
| 441 | const base64MaxBytes = streamConfig.base64MaxBytes |