({
body,
userId,
stripeCustomerId,
agentId,
fetch,
logger,
insertMessageBigquery,
}: {
body: ChatCompletionRequestBody
userId: string
stripeCustomerId?: string | null
agentId: string
fetch: typeof globalThis.fetch
logger: Logger
insertMessageBigquery: InsertMessageBigqueryFn
})
| 225 | } |
| 226 | |
| 227 | export async function handleCanopyWaveStream({ |
| 228 | body, |
| 229 | userId, |
| 230 | stripeCustomerId, |
| 231 | agentId, |
| 232 | fetch, |
| 233 | logger, |
| 234 | insertMessageBigquery, |
| 235 | }: { |
| 236 | body: ChatCompletionRequestBody |
| 237 | userId: string |
| 238 | stripeCustomerId?: string | null |
| 239 | agentId: string |
| 240 | fetch: typeof globalThis.fetch |
| 241 | logger: Logger |
| 242 | insertMessageBigquery: InsertMessageBigqueryFn |
| 243 | }) { |
| 244 | const originalModel = body.model |
| 245 | const startTime = new Date() |
| 246 | const { clientId, clientRequestId, costMode } = extractRequestMetadata({ body, logger }) |
| 247 | const auditRequest = createRequestAuditRecord(body) |
| 248 | |
| 249 | const response = await createCanopyWaveRequest({ body, originalModel, fetch }) |
| 250 | |
| 251 | if (!response.ok) { |
| 252 | throw await parseCanopyWaveError(response) |
| 253 | } |
| 254 | |
| 255 | const reader = response.body?.getReader() |
| 256 | if (!reader) { |
| 257 | throw new Error('Failed to get response reader') |
| 258 | } |
| 259 | |
| 260 | let heartbeatInterval: NodeJS.Timeout |
| 261 | let state: StreamState = { responseText: '', reasoningText: '', ttftMs: null, billedAlready: false } |
| 262 | let clientDisconnected = false |
| 263 | |
| 264 | const stream = new ReadableStream({ |
| 265 | async start(controller) { |
| 266 | const decoder = new TextDecoder() |
| 267 | let buffer = '' |
| 268 | |
| 269 | controller.enqueue( |
| 270 | new TextEncoder().encode(`: connected ${new Date().toISOString()}\n`), |
| 271 | ) |
| 272 | |
| 273 | heartbeatInterval = setInterval(() => { |
| 274 | if (!clientDisconnected) { |
| 275 | try { |
| 276 | controller.enqueue( |
| 277 | new TextEncoder().encode( |
| 278 | `: heartbeat ${new Date().toISOString()}\n\n`, |
| 279 | ), |
| 280 | ) |
| 281 | } catch { |
| 282 | // client disconnected |
| 283 | } |
| 284 | } |
no test coverage detected