MCPcopy
hub / github.com/coder/mux / cleanupAbortedStream

Method cleanupAbortedStream

src/node/services/streamManager.ts:1295–1350  ·  view source on GitHub ↗
(
    workspaceId: WorkspaceId,
    streamInfo: WorkspaceStreamInfo,
    abortReason: StreamAbortReason,
    abandonPartial?: boolean
  )

Source from the content-addressed store, hash-verified

1293 }
1294
1295 private async cleanupAbortedStream(
1296 workspaceId: WorkspaceId,
1297 streamInfo: WorkspaceStreamInfo,
1298 abortReason: StreamAbortReason,
1299 abandonPartial?: boolean
1300 ): Promise<void> {
1301 // CRITICAL: Wait for processing to fully complete before cleanup
1302 // This prevents race conditions where the old stream is still running
1303 // while a new stream starts (e.g., old stream writing to partial.json)
1304 await streamInfo.processingPromise;
1305
1306 // For aborts, use our tracked cumulativeUsage directly instead of AI SDK's totalUsage.
1307 // cumulativeUsage is updated on each finish-step event (before tool execution),
1308 // so it has accurate data even when the stream is interrupted mid-tool-call.
1309 // AI SDK's totalUsage may return zeros or stale data when aborted.
1310 const duration = Date.now() - streamInfo.startTime;
1311 const usage = hasTokenUsage(streamInfo.cumulativeUsage)
1312 ? streamInfo.cumulativeUsage
1313 : undefined;
1314 await this.backfillReasoningTokensFromParts(streamInfo, usage);
1315
1316 // For context window display, use last step's usage (inputTokens = current context size)
1317 const contextUsage = streamInfo.lastStepUsage;
1318 const contextProviderMetadata = streamInfo.lastStepProviderMetadata;
1319
1320 // Include provider metadata for accurate cost calculation
1321 const providerMetadata = markProviderMetadataCostsIncluded(
1322 streamInfo.cumulativeProviderMetadata,
1323 streamInfo.initialMetadata?.costsIncluded
1324 );
1325
1326 // Record session usage for aborted streams (mirrors stream-end path)
1327 // This ensures tokens consumed before abort are tracked for cost display
1328 await this.recordSessionUsage(
1329 workspaceId,
1330 streamInfo.model,
1331 usage,
1332 providerMetadata,
1333 "Failed to record session usage on abort",
1334 "error",
1335 streamInfo
1336 );
1337
1338 // Emit abort event with usage if available
1339 this.emitStreamAbort(
1340 workspaceId,
1341 streamInfo.messageId,
1342 { usage, contextUsage, duration, providerMetadata, contextProviderMetadata },
1343 abortReason,
1344 abandonPartial,
1345 streamInfo.initialMetadata?.acpPromptId
1346 );
1347
1348 // Clean up immediately
1349 this.workspaceStreams.delete(workspaceId);
1350 }
1351
1352 private async recordSessionUsage(

Callers 2

cancelStreamSafelyMethod · 0.95
checkSoftCancelStreamMethod · 0.95

Calls 6

recordSessionUsageMethod · 0.95
emitStreamAbortMethod · 0.95
hasTokenUsageFunction · 0.85
deleteMethod · 0.45

Tested by

no test coverage detected