processGeminiStream handles the streaming response from Gemini
( ctx context.Context, body io.Reader, sseHandler *sse.SSEHandlerCh, chatOpts uctypes.WaveChatOpts, cont *uctypes.WaveContinueResponse, )
| 272 | |
| 273 | // processGeminiStream handles the streaming response from Gemini |
| 274 | func processGeminiStream( |
| 275 | ctx context.Context, |
| 276 | body io.Reader, |
| 277 | sseHandler *sse.SSEHandlerCh, |
| 278 | chatOpts uctypes.WaveChatOpts, |
| 279 | cont *uctypes.WaveContinueResponse, |
| 280 | ) (*uctypes.WaveStopReason, *GeminiChatMessage, error) { |
| 281 | msgID := uuid.New().String() |
| 282 | textID := uuid.New().String() |
| 283 | textStarted := false |
| 284 | var textBuilder strings.Builder |
| 285 | var textThoughtSignature string |
| 286 | var finishReason string |
| 287 | var functionCalls []GeminiMessagePart |
| 288 | var usageMetadata *GeminiUsageMetadata |
| 289 | |
| 290 | if cont == nil { |
| 291 | _ = sseHandler.AiMsgStart(msgID) |
| 292 | } |
| 293 | _ = sseHandler.AiMsgStartStep() |
| 294 | |
| 295 | decoder := eventsource.NewDecoder(body) |
| 296 | |
| 297 | for { |
| 298 | if err := ctx.Err(); err != nil { |
| 299 | _ = sseHandler.AiMsgError("request cancelled") |
| 300 | return &uctypes.WaveStopReason{ |
| 301 | Kind: uctypes.StopKindCanceled, |
| 302 | ErrorType: "cancelled", |
| 303 | ErrorText: "request cancelled", |
| 304 | }, nil, err |
| 305 | } |
| 306 | |
| 307 | event, err := decoder.Decode() |
| 308 | if err != nil { |
| 309 | if errors.Is(err, io.EOF) { |
| 310 | break |
| 311 | } |
| 312 | if sseHandler.Err() != nil { |
| 313 | partialMsg := extractPartialGeminiMessage(msgID, textBuilder.String()) |
| 314 | return &uctypes.WaveStopReason{ |
| 315 | Kind: uctypes.StopKindCanceled, |
| 316 | ErrorType: "client_disconnect", |
| 317 | ErrorText: "client disconnected", |
| 318 | }, partialMsg, nil |
| 319 | } |
| 320 | _ = sseHandler.AiMsgError(fmt.Sprintf("stream decode error: %v", err)) |
| 321 | return &uctypes.WaveStopReason{ |
| 322 | Kind: uctypes.StopKindError, |
| 323 | ErrorType: "stream", |
| 324 | ErrorText: err.Error(), |
| 325 | }, nil, fmt.Errorf("stream decode error: %w", err) |
| 326 | } |
| 327 | |
| 328 | data := event.Data() |
| 329 | if data == "" { |
| 330 | continue |
| 331 | } |
no test coverage detected