handleOpenAIStreamingResp handles the OpenAI SSE streaming response
(ctx context.Context, sse *sse.SSEHandlerCh, decoder *eventsource.Decoder, cont *uctypes.WaveContinueResponse, chatOpts uctypes.WaveChatOpts)
| 603 | |
| 604 | // handleOpenAIStreamingResp handles the OpenAI SSE streaming response |
| 605 | func handleOpenAIStreamingResp(ctx context.Context, sse *sse.SSEHandlerCh, decoder *eventsource.Decoder, cont *uctypes.WaveContinueResponse, chatOpts uctypes.WaveChatOpts) (*uctypes.WaveStopReason, []*OpenAIChatMessage) { |
| 606 | // Per-response state |
| 607 | state := &openaiStreamingState{ |
| 608 | blockMap: map[string]*openaiBlockState{}, |
| 609 | chatOpts: chatOpts, |
| 610 | } |
| 611 | |
| 612 | var rtnStopReason *uctypes.WaveStopReason |
| 613 | var rtnMessages []*OpenAIChatMessage |
| 614 | |
| 615 | // Ensure step is closed on error/cancellation |
| 616 | defer func() { |
| 617 | if !state.stepStarted { |
| 618 | return |
| 619 | } |
| 620 | _ = sse.AiMsgFinishStep() |
| 621 | if rtnStopReason == nil || rtnStopReason.Kind != uctypes.StopKindToolUse { |
| 622 | _ = sse.AiMsgFinish("", nil) |
| 623 | } |
| 624 | }() |
| 625 | |
| 626 | // SSE event processing loop |
| 627 | for { |
| 628 | event, err := decoder.Decode() |
| 629 | if err != nil { |
| 630 | if errors.Is(err, io.EOF) { |
| 631 | // EOF without proper completion - protocol error |
| 632 | _ = sse.AiMsgError("stream ended unexpectedly without completion") |
| 633 | return &uctypes.WaveStopReason{ |
| 634 | Kind: uctypes.StopKindError, |
| 635 | ErrorType: "protocol", |
| 636 | ErrorText: "stream ended unexpectedly without completion", |
| 637 | }, rtnMessages |
| 638 | } |
| 639 | // Check if client disconnected |
| 640 | if sse.Err() != nil { |
| 641 | // SSE connection broken (client stopped/disconnected) |
| 642 | partialMessages := extractPartialTextFromState(state) |
| 643 | if partialMessages != nil { |
| 644 | rtnMessages = append(rtnMessages, partialMessages...) |
| 645 | } |
| 646 | return &uctypes.WaveStopReason{ |
| 647 | Kind: uctypes.StopKindCanceled, |
| 648 | ErrorType: "client_disconnect", |
| 649 | ErrorText: "client disconnected", |
| 650 | }, rtnMessages |
| 651 | } |
| 652 | // transport error mid-stream |
| 653 | _ = sse.AiMsgError(err.Error()) |
| 654 | return &uctypes.WaveStopReason{ |
| 655 | Kind: uctypes.StopKindError, |
| 656 | ErrorType: "stream", |
| 657 | ErrorText: err.Error(), |
| 658 | }, rtnMessages |
| 659 | } |
| 660 | |
| 661 | if finalStopReason, finalMessages := handleOpenAIEvent(event, sse, state, cont); finalStopReason != nil { |
| 662 | rtnStopReason = finalStopReason |
no test coverage detected