* Processes a stream with guaranteed cleanup, regardless of success or failure
(
workspaceId: WorkspaceId,
streamInfo: WorkspaceStreamInfo,
historySequence: number
)
| 2404 | * Processes a stream with guaranteed cleanup, regardless of success or failure |
| 2405 | */ |
| 2406 | private async processStreamWithCleanup( |
| 2407 | workspaceId: WorkspaceId, |
| 2408 | streamInfo: WorkspaceStreamInfo, |
| 2409 | historySequence: number |
| 2410 | ): Promise<void> { |
| 2411 | this.mcpServerManager?.acquireLease(workspaceId as string); |
| 2412 | |
| 2413 | try { |
| 2414 | // Update state to streaming |
| 2415 | streamInfo.state = StreamState.STREAMING; |
| 2416 | |
| 2417 | // Emit stream start event (include mode from initialMetadata if available) |
| 2418 | this.emitStreamStart(workspaceId, streamInfo, historySequence); |
| 2419 | |
| 2420 | // Initialize token tracker for this model |
| 2421 | await this.tokenTracker.setModel(streamInfo.model, streamInfo.metadataModel); |
| 2422 | |
| 2423 | let didRetryPreviousResponseId = false; |
| 2424 | let emptyStreamRecoveryAttempts = 0; |
| 2425 | const workspaceLog = this.getWorkspaceLogger(workspaceId, streamInfo); |
| 2426 | let orphanToolResultCount = 0; |
| 2427 | |
| 2428 | while (true) { |
| 2429 | // Use fullStream to capture all events including tool calls |
| 2430 | const toolCalls: ToolCallMap = new Map(); |
| 2431 | |
| 2432 | try { |
| 2433 | for await (const part of streamInfo.streamResult.fullStream) { |
| 2434 | // Check if stream was cancelled BEFORE processing any parts |
| 2435 | // This improves interruption responsiveness by catching aborts earlier |
| 2436 | if (streamInfo.abortController.signal.aborted) { |
| 2437 | break; |
| 2438 | } |
| 2439 | |
| 2440 | // Log all stream parts to debug reasoning (commented out - too spammy) |
| 2441 | // console.log("[DEBUG streamManager]: Stream part", { |
| 2442 | // type: part.type, |
| 2443 | // hasText: "text" in part, |
| 2444 | // preview: "text" in part ? (part as StreamPartWithText).text?.substring(0, 50) : undefined, |
| 2445 | // }); |
| 2446 | |
| 2447 | switch (part.type) { |
| 2448 | case "start-step": { |
| 2449 | streamInfo.currentStepStartIndex = streamInfo.parts.length; |
| 2450 | break; |
| 2451 | } |
| 2452 | |
| 2453 | case "text-delta": { |
| 2454 | // Providers/SDKs may stream text deltas under different keys. |
| 2455 | const textDeltaPart = part as Record<string, unknown>; |
| 2456 | |
| 2457 | const deltaText = extractChunkDeltaText(textDeltaPart, [ |
| 2458 | "text", |
| 2459 | "delta", |
| 2460 | "textDelta", |
| 2461 | ]); |
| 2462 | |
| 2463 | if (deltaText.length === 0) { |
no test coverage detected