( stream: AsyncIterable<ChatCompletionChunk>, providerName: string, onComplete?: (content: string, usage: CompletionUsage) => void )
| 1336 | * @returns A ReadableStream that can be used for streaming responses |
| 1337 | */ |
| 1338 | export function createOpenAICompatibleStream( |
| 1339 | stream: AsyncIterable<ChatCompletionChunk>, |
| 1340 | providerName: string, |
| 1341 | onComplete?: (content: string, usage: CompletionUsage) => void |
| 1342 | ): ReadableStream<Uint8Array> { |
| 1343 | const streamLogger = createLogger(`${providerName}Utils`) |
| 1344 | let fullContent = '' |
| 1345 | let promptTokens = 0 |
| 1346 | let completionTokens = 0 |
| 1347 | let totalTokens = 0 |
| 1348 | |
| 1349 | return new ReadableStream({ |
| 1350 | async start(controller) { |
| 1351 | try { |
| 1352 | for await (const chunk of stream) { |
| 1353 | if (chunk.usage) { |
| 1354 | promptTokens = chunk.usage.prompt_tokens ?? 0 |
| 1355 | completionTokens = chunk.usage.completion_tokens ?? 0 |
| 1356 | totalTokens = chunk.usage.total_tokens ?? 0 |
| 1357 | } |
| 1358 | |
| 1359 | const content = chunk.choices?.[0]?.delta?.content || '' |
| 1360 | if (content) { |
| 1361 | fullContent += content |
| 1362 | controller.enqueue(new TextEncoder().encode(content)) |
| 1363 | } |
| 1364 | } |
| 1365 | |
| 1366 | if (onComplete) { |
| 1367 | if (promptTokens === 0 && completionTokens === 0) { |
| 1368 | streamLogger.warn(`${providerName} stream completed without usage data`) |
| 1369 | } |
| 1370 | onComplete(fullContent, { |
| 1371 | prompt_tokens: promptTokens, |
| 1372 | completion_tokens: completionTokens, |
| 1373 | total_tokens: totalTokens || promptTokens + completionTokens, |
| 1374 | }) |
| 1375 | } |
| 1376 | |
| 1377 | controller.close() |
| 1378 | } catch (error) { |
| 1379 | controller.error(error) |
| 1380 | } |
| 1381 | }, |
| 1382 | }) |
| 1383 | } |
| 1384 | |
| 1385 | /** |
| 1386 | * Checks if a forced tool was used in an OpenAI-compatible response and updates tracking. |
no test coverage detected