(controller)
| 228 | |
| 229 | const stream = new ReadableStream({ |
| 230 | async start(controller) { |
| 231 | const decoder = new TextDecoder() |
| 232 | let buffer = '' |
| 233 | |
| 234 | controller.enqueue( |
| 235 | new TextEncoder().encode(`: connected ${new Date().toISOString()}\n`), |
| 236 | ) |
| 237 | |
| 238 | heartbeatInterval = setInterval(() => { |
| 239 | if (!clientDisconnected) { |
| 240 | try { |
| 241 | controller.enqueue( |
| 242 | new TextEncoder().encode( |
| 243 | `: heartbeat ${new Date().toISOString()}\n\n`, |
| 244 | ), |
| 245 | ) |
| 246 | } catch { |
| 247 | // client disconnected |
| 248 | } |
| 249 | } |
| 250 | }, 30000) |
| 251 | |
| 252 | try { |
| 253 | let done = false |
| 254 | while (!done) { |
| 255 | const result = await reader.read() |
| 256 | done = result.done |
| 257 | const value = result.value |
| 258 | |
| 259 | if (done) break |
| 260 | |
| 261 | buffer += decoder.decode(value, { stream: true }) |
| 262 | let lineEnd = buffer.indexOf('\n') |
| 263 | |
| 264 | while (lineEnd !== -1) { |
| 265 | const line = buffer.slice(0, lineEnd + 1) |
| 266 | buffer = buffer.slice(lineEnd + 1) |
| 267 | |
| 268 | const lineResult = await handleLine({ |
| 269 | userId, |
| 270 | stripeCustomerId, |
| 271 | agentId, |
| 272 | clientId, |
| 273 | clientRequestId, |
| 274 | costMode, |
| 275 | startTime, |
| 276 | request: auditRequest, |
| 277 | originalModel, |
| 278 | line, |
| 279 | state, |
| 280 | logger, |
| 281 | insertMessage: insertMessageBigquery, |
| 282 | }) |
| 283 | state = lineResult.state |
| 284 | |
| 285 | if (!clientDisconnected) { |
| 286 | try { |
| 287 | controller.enqueue(new TextEncoder().encode(lineResult.patchedLine)) |
nothing calls this directly
no test coverage detected