MCPcopy
hub / github.com/coder/mux / processStreamWithCleanup

Method processStreamWithCleanup

src/node/services/streamManager.ts:2406–3059  ·  view source on GitHub ↗

* Processes a stream with guaranteed cleanup, regardless of success or failure

(
    workspaceId: WorkspaceId,
    streamInfo: WorkspaceStreamInfo,
    historySequence: number
  )

Source from the content-addressed store, hash-verified

2404 * Processes a stream with guaranteed cleanup, regardless of success or failure
2405 */
2406 private async processStreamWithCleanup(
2407 workspaceId: WorkspaceId,
2408 streamInfo: WorkspaceStreamInfo,
2409 historySequence: number
2410 ): Promise<void> {
2411 this.mcpServerManager?.acquireLease(workspaceId as string);
2412
2413 try {
2414 // Update state to streaming
2415 streamInfo.state = StreamState.STREAMING;
2416
2417 // Emit stream start event (include mode from initialMetadata if available)
2418 this.emitStreamStart(workspaceId, streamInfo, historySequence);
2419
2420 // Initialize token tracker for this model
2421 await this.tokenTracker.setModel(streamInfo.model, streamInfo.metadataModel);
2422
2423 let didRetryPreviousResponseId = false;
2424 let emptyStreamRecoveryAttempts = 0;
2425 const workspaceLog = this.getWorkspaceLogger(workspaceId, streamInfo);
2426 let orphanToolResultCount = 0;
2427
2428 while (true) {
2429 // Use fullStream to capture all events including tool calls
2430 const toolCalls: ToolCallMap = new Map();
2431
2432 try {
2433 for await (const part of streamInfo.streamResult.fullStream) {
2434 // Check if stream was cancelled BEFORE processing any parts
2435 // This improves interruption responsiveness by catching aborts earlier
2436 if (streamInfo.abortController.signal.aborted) {
2437 break;
2438 }
2439
2440 // Log all stream parts to debug reasoning (commented out - too spammy)
2441 // console.log("[DEBUG streamManager]: Stream part", {
2442 // type: part.type,
2443 // hasText: "text" in part,
2444 // preview: "text" in part ? (part as StreamPartWithText).text?.substring(0, 50) : undefined,
2445 // });
2446
2447 switch (part.type) {
2448 case "start-step": {
2449 streamInfo.currentStepStartIndex = streamInfo.parts.length;
2450 break;
2451 }
2452
2453 case "text-delta": {
2454 // Providers/SDKs may stream text deltas under different keys.
2455 const textDeltaPart = part as Record<string, unknown>;
2456
2457 const deltaText = extractChunkDeltaText(textDeltaPart, [
2458 "text",
2459 "delta",
2460 "textDelta",
2461 ]);
2462
2463 if (deltaText.length === 0) {

Callers 1

startStreamMethod · 0.95

Calls 15

emitStreamStartMethod · 0.95
getWorkspaceLoggerMethod · 0.95
appendPartAndEmitMethod · 0.95
schedulePartialWriteMethod · 0.95
checkSoftCancelStreamMethod · 0.95
flushPartialWriteMethod · 0.95
logOrphanToolResultMethod · 0.95
finishToolCallMethod · 0.95

Tested by

no test coverage detected