handleOpenAIEvent processes one SSE event block. It may emit SSE parts and/or return a StopReason and final message when the stream is complete. Return tuple: - final: a *StopReason to return immediately (e.g., after response.completed or error) - message: a *OpenAIChatMessage when response is comp
( event eventsource.Event, sse *sse.SSEHandlerCh, state *openaiStreamingState, cont *uctypes.WaveContinueResponse, )
| 710 | // - final: a *StopReason to return immediately (e.g., after response.completed or error) |
| 711 | // - message: a *OpenAIChatMessage when response is completed |
| 712 | func handleOpenAIEvent( |
| 713 | event eventsource.Event, |
| 714 | sse *sse.SSEHandlerCh, |
| 715 | state *openaiStreamingState, |
| 716 | cont *uctypes.WaveContinueResponse, |
| 717 | ) (final *uctypes.WaveStopReason, messages []*OpenAIChatMessage) { |
| 718 | if err := sse.Err(); err != nil { |
| 719 | return &uctypes.WaveStopReason{ |
| 720 | Kind: uctypes.StopKindCanceled, |
| 721 | ErrorType: "client_disconnect", |
| 722 | ErrorText: "client disconnected", |
| 723 | }, nil |
| 724 | } |
| 725 | |
| 726 | eventName := event.Event() |
| 727 | data := event.Data() |
| 728 | |
| 729 | switch eventName { |
| 730 | case "response.created": |
| 731 | var ev openaiResponseCreatedEvent |
| 732 | if err := json.Unmarshal([]byte(data), &ev); err != nil { |
| 733 | _ = sse.AiMsgError(err.Error()) |
| 734 | return &uctypes.WaveStopReason{Kind: uctypes.StopKindError, ErrorType: "decode", ErrorText: err.Error()}, nil |
| 735 | } |
| 736 | state.msgID = ev.Response.Id |
| 737 | state.model = ev.Response.Model |
| 738 | if cont == nil { |
| 739 | _ = sse.AiMsgStart(state.msgID) |
| 740 | } |
| 741 | return nil, nil |
| 742 | |
| 743 | case "response.in_progress": |
| 744 | // Start the step on in_progress |
| 745 | if !state.stepStarted { |
| 746 | _ = sse.AiMsgStartStep() |
| 747 | state.stepStarted = true |
| 748 | } |
| 749 | return nil, nil |
| 750 | |
| 751 | case "response.output_item.added": |
| 752 | var ev openaiResponseOutputItemAddedEvent |
| 753 | if err := json.Unmarshal([]byte(data), &ev); err != nil { |
| 754 | _ = sse.AiMsgError(err.Error()) |
| 755 | return &uctypes.WaveStopReason{Kind: uctypes.StopKindError, ErrorType: "decode", ErrorText: err.Error()}, nil |
| 756 | } |
| 757 | |
| 758 | switch ev.Item.Type { |
| 759 | case "reasoning": |
| 760 | // Create reasoning block - emit start immediately |
| 761 | id := uuid.New().String() |
| 762 | state.blockMap[ev.Item.Id] = &openaiBlockState{ |
| 763 | kind: openaiBlockReasoning, |
| 764 | localID: id, |
| 765 | summaryCount: 0, |
| 766 | } |
| 767 | _ = sse.AiMsgReasoningStart(id) |
| 768 | case "message": |
| 769 | // Message item - content parts will be handled in streaming events |
no test coverage detected