* Processes streamed chunks from the Responses API and yields AG-UI events. * Override this in subclasses to handle provider-specific stream behavior. * * Handles the following event types: * - response.created / response.incomplete / response.failed * - response.output_text.delta
(
stream: AsyncIterable<ResponseStreamEvent>,
toolCallMetadata: Map<
string,
{
index: number
name: string
started: boolean
ended?: boolean
pendingArguments?: string | undefined
}
>,
options: TextOptions<TProviderOptions>,
aguiState: {
runId: string
threadId: string
messageId: string
hasEmittedRunStarted: boolean
},
)
| 758 | * - error |
| 759 | */ |
| 760 | protected async *processStreamChunks( |
| 761 | stream: AsyncIterable<ResponseStreamEvent>, |
| 762 | toolCallMetadata: Map< |
| 763 | string, |
| 764 | { |
| 765 | index: number |
| 766 | name: string |
| 767 | started: boolean |
| 768 | ended?: boolean |
| 769 | pendingArguments?: string | undefined |
| 770 | } |
| 771 | >, |
| 772 | options: TextOptions<TProviderOptions>, |
| 773 | aguiState: { |
| 774 | runId: string |
| 775 | threadId: string |
| 776 | messageId: string |
| 777 | hasEmittedRunStarted: boolean |
| 778 | }, |
| 779 | ): AsyncIterable<StreamChunk> { |
| 780 | let accumulatedContent = '' |
| 781 | let accumulatedReasoning = '' |
| 782 | |
| 783 | // Track if we've been streaming deltas to avoid duplicating content from done events |
| 784 | let hasStreamedContentDeltas = false |
| 785 | let hasStreamedReasoningDeltas = false |
| 786 | |
| 787 | // Preserve response metadata across events |
| 788 | let model: string = options.model |
| 789 | |
| 790 | // AG-UI lifecycle tracking |
| 791 | let stepId: string | null = null |
| 792 | let hasEmittedTextMessageStart = false |
| 793 | let hasEmittedStepStarted = false |
| 794 | // Track whether we've emitted a terminal RUN_FINISHED so the |
| 795 | // end-of-stream fallback below knows to synthesise one when the upstream |
| 796 | // cuts off without a response.completed event. |
| 797 | let runFinishedEmitted = false |
| 798 | |
| 799 | try { |
| 800 | for await (const chunk of stream) { |
| 801 | options.logger.provider(`provider=${this.name} type=${chunk.type}`, { |
| 802 | provider: this.name, |
| 803 | type: chunk.type, |
| 804 | }) |
| 805 | |
| 806 | // Emit RUN_STARTED on first chunk |
| 807 | if (!aguiState.hasEmittedRunStarted) { |
| 808 | aguiState.hasEmittedRunStarted = true |
| 809 | yield { |
| 810 | type: EventType.RUN_STARTED, |
| 811 | runId: aguiState.runId, |
| 812 | threadId: aguiState.threadId, |
| 813 | model: model || options.model, |
| 814 | timestamp: Date.now(), |
| 815 | parentRunId: options.parentRunId, |
| 816 | } |
| 817 | } |
nothing calls this directly
no test coverage detected