(controller)
| 373 | |
| 374 | const stream = new ReadableStream({ |
| 375 | async start(controller) { |
| 376 | const decoder = new TextDecoder() |
| 377 | let buffer = '' |
| 378 | |
| 379 | controller.enqueue( |
| 380 | new TextEncoder().encode(`: connected ${new Date().toISOString()}\n`), |
| 381 | ) |
| 382 | |
| 383 | heartbeatInterval = setInterval(() => { |
| 384 | if (!clientDisconnected) { |
| 385 | try { |
| 386 | controller.enqueue( |
| 387 | new TextEncoder().encode( |
| 388 | `: heartbeat ${new Date().toISOString()}\n\n`, |
| 389 | ), |
| 390 | ) |
| 391 | } catch { |
| 392 | // client disconnected |
| 393 | } |
| 394 | } |
| 395 | }, 30000) |
| 396 | |
| 397 | try { |
| 398 | let done = false |
| 399 | while (!done) { |
| 400 | const result = await reader.read() |
| 401 | done = result.done |
| 402 | const value = result.value |
| 403 | |
| 404 | if (done) break |
| 405 | |
| 406 | buffer += decoder.decode(value, { stream: true }) |
| 407 | let lineEnd = buffer.indexOf('\n') |
| 408 | |
| 409 | while (lineEnd !== -1) { |
| 410 | const line = buffer.slice(0, lineEnd + 1) |
| 411 | buffer = buffer.slice(lineEnd + 1) |
| 412 | |
| 413 | const lineResult = await handleLine({ |
| 414 | userId, |
| 415 | stripeCustomerId, |
| 416 | agentId, |
| 417 | clientId, |
| 418 | clientRequestId, |
| 419 | costMode, |
| 420 | startTime, |
| 421 | request: auditRequest, |
| 422 | originalModel, |
| 423 | line, |
| 424 | state, |
| 425 | logger, |
| 426 | insertMessage: insertMessageBigquery, |
| 427 | }) |
| 428 | state = lineResult.state |
| 429 | |
| 430 | if (!clientDisconnected) { |
| 431 | try { |
| 432 | controller.enqueue( |
nothing calls this directly
no test coverage detected