MCPcopy
hub / github.com/coder/mux / finalizeStreamAndReadMessage

Function finalizeStreamAndReadMessage

src/node/services/streamManager.test.ts:3233–3341  ·  view source on GitHub ↗
(params: {
    workspaceId: string;
    messageId: string;
    historySequence: number;
    startTime: number;
    parts: unknown[];
    initialMetadata?: Record<string, unknown>;
    emitStartEvent?: boolean;
    onStreamStart?: (event: Record<string, unknown>) => void;
    onStreamEnd?: (event: { metadata?: Record<string, unknown> }) => void;
    usage?: {
      inputTokens: number;
      outputTokens: number;
      totalTokens: number;
      reasoningTokens?: number;
    };
    model?: string;
    metadataModel?: string;
    streamManager?: StreamManager;
    beforeProcess?: (params: {
      streamManager: StreamManager;
      workspaceId: string;
      messageId: string;
    }) => Promise<void> | void;
  })

Source from the content-addressed store, hash-verified

3231 }
3232
3233 async function finalizeStreamAndReadMessage(params: {
3234 workspaceId: string;
3235 messageId: string;
3236 historySequence: number;
3237 startTime: number;
3238 parts: unknown[];
3239 initialMetadata?: Record<string, unknown>;
3240 emitStartEvent?: boolean;
3241 onStreamStart?: (event: Record<string, unknown>) => void;
3242 onStreamEnd?: (event: { metadata?: Record<string, unknown> }) => void;
3243 usage?: {
3244 inputTokens: number;
3245 outputTokens: number;
3246 totalTokens: number;
3247 reasoningTokens?: number;
3248 };
3249 model?: string;
3250 metadataModel?: string;
3251 streamManager?: StreamManager;
3252 beforeProcess?: (params: {
3253 streamManager: StreamManager;
3254 workspaceId: string;
3255 messageId: string;
3256 }) => Promise<void> | void;
3257 }) {
3258 const streamManager = params.streamManager ?? new StreamManager(historyService);
3259 // Suppress error events from bubbling up as uncaught exceptions during tests
3260 streamManager.on("error", () => undefined);
3261
3262 if (params.onStreamStart) {
3263 streamManager.on("stream-start", params.onStreamStart);
3264 }
3265 if (params.onStreamEnd) {
3266 streamManager.on("stream-end", params.onStreamEnd);
3267 }
3268
3269 const replaceTokenTrackerResult = Reflect.set(streamManager, "tokenTracker", {
3270 setModel: () => Promise.resolve(undefined),
3271 countTokens: () => Promise.resolve(0),
3272 });
3273 if (!replaceTokenTrackerResult) {
3274 throw new Error("Failed to mock StreamManager.tokenTracker");
3275 }
3276
3277 await appendPartialAssistantForTests(
3278 params.workspaceId,
3279 params.messageId,
3280 params.historySequence
3281 );
3282
3283 const processStreamWithCleanup = getProcessStreamWithCleanupForTests(streamManager);
3284 const usage = params.usage ?? { inputTokens: 4, outputTokens: 6, totalTokens: 10 };
3285 const streamInfo = createStreamInfoForTests({
3286 streamResult: createStreamResultForTests(
3287 (async function* () {
3288 // Tests pre-populate parts but still need the provider's terminal proof of completion.
3289 await Promise.resolve();
3290 yield { type: "finish", finishReason: "stop" };

Callers 1

Calls 10

createStreamInfoForTestsFunction · 0.85
getPrivateMethodForTestsFunction · 0.85
onMethod · 0.80
setMethod · 0.80
resolveMethod · 0.80

Tested by

no test coverage detected