MCPcopy
hub / github.com/TanStack/ai / processStreamChunks

Function processStreamChunks

packages/openai-base/src/adapters/responses-text.ts:760–1605  ·  view source on GitHub ↗

* Processes streamed chunks from the Responses API and yields AG-UI events. * Override this in subclasses to handle provider-specific stream behavior. * * Handles the following event types: * - response.created / response.incomplete / response.failed * - response.output_text.delta

(
    stream: AsyncIterable<ResponseStreamEvent>,
    toolCallMetadata: Map<
      string,
      {
        index: number
        name: string
        started: boolean
        ended?: boolean
        pendingArguments?: string | undefined
      }
    >,
    options: TextOptions<TProviderOptions>,
    aguiState: {
      runId: string
      threadId: string
      messageId: string
      hasEmittedRunStarted: boolean
    },
  )

Source from the content-addressed store, hash-verified

758 * - error
759 */
760 protected async *processStreamChunks(
761 stream: AsyncIterable<ResponseStreamEvent>,
762 toolCallMetadata: Map<
763 string,
764 {
765 index: number
766 name: string
767 started: boolean
768 ended?: boolean
769 pendingArguments?: string | undefined
770 }
771 >,
772 options: TextOptions<TProviderOptions>,
773 aguiState: {
774 runId: string
775 threadId: string
776 messageId: string
777 hasEmittedRunStarted: boolean
778 },
779 ): AsyncIterable<StreamChunk> {
780 let accumulatedContent = ''
781 let accumulatedReasoning = ''
782
783 // Track if we've been streaming deltas to avoid duplicating content from done events
784 let hasStreamedContentDeltas = false
785 let hasStreamedReasoningDeltas = false
786
787 // Preserve response metadata across events
788 let model: string = options.model
789
790 // AG-UI lifecycle tracking
791 let stepId: string | null = null
792 let hasEmittedTextMessageStart = false
793 let hasEmittedStepStarted = false
794 // Track whether we've emitted a terminal RUN_FINISHED so the
795 // end-of-stream fallback below knows to synthesise one when the upstream
796 // cuts off without a response.completed event.
797 let runFinishedEmitted = false
798
799 try {
800 for await (const chunk of stream) {
801 options.logger.provider(`provider=${this.name} type=${chunk.type}`, {
802 provider: this.name,
803 type: chunk.type,
804 })
805
806 // Emit RUN_STARTED on first chunk
807 if (!aguiState.hasEmittedRunStarted) {
808 aguiState.hasEmittedRunStarted = true
809 yield {
810 type: EventType.RUN_STARTED,
811 runId: aguiState.runId,
812 threadId: aguiState.threadId,
813 model: model || options.model,
814 timestamp: Date.now(),
815 parentRunId: options.parentRunId,
816 }
817 }

Callers

nothing calls this directly

Calls 12

generateIdFunction · 0.90
buildResponsesUsageFunction · 0.90
handleContentPartFunction · 0.85
toRunErrorPayloadFunction · 0.85
toRunErrorRawEventFunction · 0.85
providerMethod · 0.80
nowMethod · 0.80
errorsMethod · 0.80
parseMethod · 0.80
hasMethod · 0.80
getMethod · 0.65
setMethod · 0.65

Tested by

no test coverage detected