(controller)
| 382 | |
| 383 | const readable = new ReadableStream({ |
| 384 | async start(controller) { |
| 385 | const reader = response.body?.getReader() |
| 386 | if (!reader) { |
| 387 | controller.close() |
| 388 | return |
| 389 | } |
| 390 | |
| 391 | let finalUsage: any = null |
| 392 | let usageRecorded = false |
| 393 | |
| 394 | const flushUsage = async () => { |
| 395 | if (usageRecorded || !finalUsage) { |
| 396 | return |
| 397 | } |
| 398 | |
| 399 | usageRecorded = true |
| 400 | await updateUserStatsForWand( |
| 401 | billingUserId, |
| 402 | workspaceId, |
| 403 | finalUsage, |
| 404 | requestId, |
| 405 | isBYOK |
| 406 | ) |
| 407 | } |
| 408 | |
| 409 | try { |
| 410 | let buffer = '' |
| 411 | let chunkCount = 0 |
| 412 | let activeEventType: string | undefined |
| 413 | |
| 414 | while (true) { |
| 415 | const { done, value } = await reader.read() |
| 416 | |
| 417 | if (done) { |
| 418 | logger.info(`[${requestId}] Stream completed. Total chunks: ${chunkCount}`) |
| 419 | await flushUsage() |
| 420 | controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)) |
| 421 | controller.close() |
| 422 | break |
| 423 | } |
| 424 | |
| 425 | buffer += decoder.decode(value, { stream: true }) |
| 426 | |
| 427 | const lines = buffer.split('\n') |
| 428 | buffer = lines.pop() || '' |
| 429 | |
| 430 | for (const line of lines) { |
| 431 | const trimmed = line.trim() |
| 432 | if (!trimmed) { |
| 433 | continue |
| 434 | } |
| 435 | |
| 436 | if (trimmed.startsWith('event:')) { |
| 437 | activeEventType = trimmed.slice(6).trim() |
| 438 | continue |
| 439 | } |
| 440 | |
| 441 | if (!trimmed.startsWith('data:')) { |
nothing calls this directly
no test coverage detected